From 0fa00ca804aa4794a5face44a6165b4db5136f9b Mon Sep 17 00:00:00 2001 From: OrangeBao Date: Tue, 21 Nov 2023 11:33:20 +0800 Subject: [PATCH] refactor: refactor the leaf resource manager Signed-off-by: OrangeBao --- .../cluster-manager/cluster_controller.go | 14 +- .../controllers/common_controller.go | 18 +- .../controllers/mcs/auto_mcs_controller.go | 10 +- .../controllers/pod/leaf_pod_controller.go | 2 +- .../controllers/pod/root_pod_controller.go | 176 +++++++++++------- .../controllers/pv/leaf_pv_controller.go | 6 +- .../controllers/pv/oneway_pv_controller.go | 2 +- .../controllers/pv/root_pv_controller.go | 2 +- .../controllers/pvc/leaf_pvc_controller.go | 2 +- .../controllers/pvc/oneway_pvc_controller.go | 2 +- .../controllers/pvc/root_pvc_controller.go | 4 +- .../cluster-manager/node-server/server.go | 4 +- .../utils/leaf_model_handler.go | 13 -- .../utils/leaf_resource_manager.go | 94 ++++++---- pkg/utils/constants.go | 4 +- pkg/utils/k8s.go | 12 +- 16 files changed, 214 insertions(+), 151 deletions(-) diff --git a/pkg/clustertree/cluster-manager/cluster_controller.go b/pkg/clustertree/cluster-manager/cluster_controller.go index f164bbd77..94a99faa6 100644 --- a/pkg/clustertree/cluster-manager/cluster_controller.go +++ b/pkg/clustertree/cluster-manager/cluster_controller.go @@ -235,9 +235,15 @@ func (c *ClusterController) clearClusterControllers(cluster *kosmosv1alpha1.Clus c.GlobalLeafManager.RemoveLeafResource(cluster.Name) } -func (c *ClusterController) setupControllers(mgr manager.Manager, cluster *kosmosv1alpha1.Cluster, nodes []*corev1.Node, clientDynamic *dynamic.DynamicClient, leafClientset kubernetes.Interface, kosmosClient kosmosversioned.Interface, leafRestConfig *rest.Config) error { - clusterName := c.LeafModelHandler.GetGlobalLeafManagerClusterName(cluster) - c.GlobalLeafManager.AddLeafResource(clusterName, &leafUtils.LeafResource{ +func (c *ClusterController) setupControllers( + mgr manager.Manager, + cluster *kosmosv1alpha1.Cluster, + nodes []*corev1.Node, + clientDynamic *dynamic.DynamicClient, + leafClientset kubernetes.Interface, + kosmosClient kosmosversioned.Interface, + leafRestConfig *rest.Config) error { + c.GlobalLeafManager.AddLeafResource(&leafUtils.LeafResource{ Client: mgr.GetClient(), DynamicClient: clientDynamic, Clientset: leafClientset, @@ -248,7 +254,7 @@ func (c *ClusterController) setupControllers(mgr manager.Manager, cluster *kosmo IgnoreLabels: strings.Split("", ","), EnableServiceAccount: true, RestConfig: leafRestConfig, - }, cluster.Spec.ClusterTreeOptions.LeafModels, nodes) + }, cluster, nodes) nodeResourcesController := controllers.NodeResourcesController{ Leaf: mgr.GetClient(), diff --git a/pkg/clustertree/cluster-manager/controllers/common_controller.go b/pkg/clustertree/cluster-manager/controllers/common_controller.go index 7e4a67058..b86def352 100644 --- a/pkg/clustertree/cluster-manager/controllers/common_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/common_controller.go @@ -44,7 +44,7 @@ type SyncResourcesReconciler struct { } func (r *SyncResourcesReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { - var owners []string + var clusters []string rootobj, err := r.DynamicRootClient.Resource(r.GroupVersionResource).Namespace(request.Namespace).Get(ctx, request.Name, metav1.GetOptions{}) if err != nil && !errors.IsNotFound(err) { klog.Errorf("get %s error: %v", request.NamespacedName, err) @@ -53,16 +53,16 @@ func (r *SyncResourcesReconciler) Reconcile(ctx context.Context, request reconci if err != nil && errors.IsNotFound(err) { // delete all - owners = r.GlobalLeafManager.ListNodeNames() + clusters = r.GlobalLeafManager.ListClusters() } else { - owners = utils.ListResourceOwnersAnnotations(rootobj.GetAnnotations()) + clusters = utils.ListResourceClusters(rootobj.GetAnnotations()) } - for _, owner := range owners { - if r.GlobalLeafManager.Has(owner) { - lr, err := r.GlobalLeafManager.GetLeafResource(owner) + for _, cluster := range clusters { + if r.GlobalLeafManager.HasCluster(cluster) { + lr, err := r.GlobalLeafManager.GetLeafResource(cluster) if err != nil { - klog.Errorf("get lr(owner: %s) err: %v", owner, err) + klog.Errorf("get lr(cluster: %s) err: %v", cluster, err) return reconcile.Result{RequeueAfter: SyncResourcesRequeueTime}, nil } if err = r.SyncResource(ctx, request, lr); err != nil { @@ -115,7 +115,7 @@ func (r *SyncResourcesReconciler) SetupWithManager(mgr manager.Manager, gvr sche } func (r *SyncResourcesReconciler) SyncResource(ctx context.Context, request reconcile.Request, lr *leafUtils.LeafResource) error { - klog.V(5).Infof("Started sync resource processing, ns: %s, name: %s", request.Namespace, request.Name) + klog.V(4).Infof("Started sync resource processing, ns: %s, name: %s", request.Namespace, request.Name) deleteSecretInClient := false @@ -146,7 +146,7 @@ func (r *SyncResourcesReconciler) SyncResource(ctx context.Context, request reco } return err } - klog.V(5).Infof("%s %q deleted", r.GroupVersionResource.Resource, request.Name) + klog.V(4).Infof("%s %q deleted", r.GroupVersionResource.Resource, request.Name) return nil } diff --git a/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go b/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go index 59543f132..ba6fdd184 100644 --- a/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go @@ -217,10 +217,9 @@ func (c *AutoCreateMCSController) cleanUpMcsResources(ctx context.Context, names continue } - clusterName := clustertreeutils.GetLeafResourceClusterName(newCluster) - leafManager, err := c.GlobalLeafManager.GetLeafResource(clusterName) + leafManager, err := c.GlobalLeafManager.GetLeafResource(cluster.Name) if err != nil { - klog.Errorf("get leafManager for cluster %s failed,Error: %v", clusterName, err) + klog.Errorf("get leafManager for cluster %s failed,Error: %v", cluster.Name, err) return err } if err = leafManager.KosmosClient.MulticlusterV1alpha1().ServiceImports(namespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil { @@ -255,10 +254,9 @@ func (c *AutoCreateMCSController) autoCreateMcsResources(ctx context.Context, se continue } - clusterName := clustertreeutils.GetLeafResourceClusterName(newCluster) - leafManager, err := c.GlobalLeafManager.GetLeafResource(clusterName) + leafManager, err := c.GlobalLeafManager.GetLeafResource(cluster.Name) if err != nil { - klog.Errorf("get leafManager for cluster %s failed,Error: %v", clusterName, err) + klog.Errorf("get leafManager for cluster %s failed,Error: %v", cluster.Name, err) return err } serviceImport := &mcsv1alpha1.ServiceImport{ diff --git a/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go b/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go index 1e0bd4a75..7320e2747 100644 --- a/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go @@ -57,7 +57,7 @@ func (r *LeafPodReconciler) Reconcile(ctx context.Context, request reconcile.Req podutils.FitObjectMeta(&podCopy.ObjectMeta) podCopy.ResourceVersion = "0" if err := r.RootClient.Status().Update(ctx, podCopy); err != nil && !apierrors.IsNotFound(err) { - klog.V(5).Info(errors.Wrap(err, "error while updating pod status in kubernetes")) + klog.V(4).Info(errors.Wrap(err, "error while updating pod status in kubernetes")) return reconcile.Result{RequeueAfter: LeafPodRequeueTime}, nil } } diff --git a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go index 0488c9990..f527f28cf 100644 --- a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go @@ -121,9 +121,9 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req if err := r.Get(ctx, request.NamespacedName, &cachepod); err != nil { if errors.IsNotFound(err) { // TODO: we cannot get leaf pod when we donnot known the node name of pod, so delete all ... - owners := r.GlobalLeafManager.ListNodeNames() - for _, owner := range owners { - lr, err := r.GlobalLeafManager.GetLeafResourceByNodeName(owner) + nodeNames := r.GlobalLeafManager.ListNodes() + for _, nodeName := range nodeNames { + lr, err := r.GlobalLeafManager.GetLeafResourceByNodeName(nodeName) if err != nil { // wait for leaf resource init return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil @@ -168,7 +168,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req // TODO: GlobalLeafResourceManager may not inited.... // belongs to the current node - if !r.GlobalLeafManager.HasNodeName(rootpod.Spec.NodeName) { + if !r.GlobalLeafManager.HasNode(rootpod.Spec.NodeName) { return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil } @@ -291,7 +291,7 @@ func (r *RootPodReconciler) createStorageInLeafCluster(ctx context.Context, lr * return fmt.Errorf("could not get resource gvr(%v) %s from root cluster: %v", gvr, rname, err) } rootannotations := rootobj.GetAnnotations() - rootannotations = utils.AddResourceOwnersAnnotations(rootannotations, lr.ClusterName) + rootannotations = utils.AddResourceClusters(rootannotations, lr.ClusterName) rootobj.SetAnnotations(rootannotations) @@ -325,7 +325,7 @@ func (r *RootPodReconciler) createStorageInLeafCluster(ctx context.Context, lr * klog.Errorf("Failed to create gvr(%v) %v err: %v", gvr, rname, err) return err } - klog.V(5).Infof("Create gvr(%v) %v in %v success", gvr, rname, ns) + klog.V(4).Infof("Create gvr(%v) %v in %v success", gvr, rname, ns) continue } return fmt.Errorf("could not check gvr(%v) %s in external cluster: %v", gvr, rname, err) @@ -556,7 +556,7 @@ func (r *RootPodReconciler) createServiceAccountInLeafCluster(ctx context.Contex if secret.Annotations == nil { return fmt.Errorf("parse secret service account error") } - klog.V(5).Infof("secret service-account info: [%v]", secret.Annotations) + klog.V(4).Infof("secret service-account info: [%v]", secret.Annotations) accountName := secret.Annotations[corev1.ServiceAccountNameKey] if accountName == "" { err := fmt.Errorf("get secret of serviceAccount not exits: [%s] [%v]", @@ -573,7 +573,7 @@ func (r *RootPodReconciler) createServiceAccountInLeafCluster(ctx context.Contex err := lr.Client.Get(ctx, saKey, sa) if err != nil || sa == nil { - klog.V(5).Infof("get serviceAccount [%v] err: [%v]]", sa, err) + klog.V(4).Infof("get serviceAccount [%v] err: [%v]]", sa, err) sa = &corev1.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: accountName, @@ -589,7 +589,7 @@ func (r *RootPodReconciler) createServiceAccountInLeafCluster(ctx context.Contex return err } } else { - klog.V(5).Infof("get secret serviceAccount info: [%s] [%v] [%v] [%v]", + klog.V(4).Infof("get secret serviceAccount info: [%s] [%v] [%v] [%v]", sa.Name, sa.CreationTimestamp, sa.Annotations, sa.UID) } secret.UID = sa.UID @@ -611,7 +611,7 @@ func (r *RootPodReconciler) createServiceAccountInLeafCluster(ctx context.Contex err = lr.Client.Update(ctx, sa) if err != nil { - klog.V(5).Infof( + klog.V(4).Infof( "update serviceAccount [%v] err: [%v]]", sa, err) return err @@ -619,6 +619,87 @@ func (r *RootPodReconciler) createServiceAccountInLeafCluster(ctx context.Contex return nil } +func (r *RootPodReconciler) createVolumes(ctx context.Context, lr *leafUtils.LeafResource, basicPod *corev1.Pod, clusterNodeInfo *leafUtils.ClusterNode) error { + // create secret configmap pvc + secretNames, imagePullSecrets := podutils.GetSecrets(basicPod) + configMaps := podutils.GetConfigmaps(basicPod) + pvcs := podutils.GetPVCs(basicPod) + + ch := make(chan string, 3) + + // configmap + go func() { + if err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) { + klog.V(4).Info("Trying to creating dependent configmaps") + if err := r.createStorageInLeafCluster(ctx, lr, utils.GVR_CONFIGMAP, configMaps, basicPod, clusterNodeInfo); err != nil { + klog.Error(err) + return false, nil + } + klog.V(4).Infof("Create configmaps %v of %v/%v success", configMaps, basicPod.Namespace, basicPod.Name) + return true, nil + }); err != nil { + ch <- fmt.Sprintf("create configmap failed: %v", err) + } + ch <- "" + }() + + // pvc + go func() { + if err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) { + if !r.Options.OnewayStorageControllers { + klog.V(4).Info("Trying to creating dependent pvc") + if err := r.createStorageInLeafCluster(ctx, lr, utils.GVR_PVC, pvcs, basicPod, clusterNodeInfo); err != nil { + klog.Error(err) + return false, nil + } + klog.V(4).Infof("Create pvc %v of %v/%v success", pvcs, basicPod.Namespace, basicPod.Name) + } + return true, nil + }); err != nil { + ch <- fmt.Sprintf("create pvc failed: %v", err) + } + ch <- "" + }() + + // secret + go func() { + if err := wait.PollImmediate(500*time.Millisecond, 10*time.Second, func() (bool, error) { + klog.V(4).Info("Trying to creating secret") + if err := r.createStorageInLeafCluster(ctx, lr, utils.GVR_SECRET, secretNames, basicPod, clusterNodeInfo); err != nil { + klog.Error(err) + return false, nil + } + + // try to create image pull secrets, ignore err + if errignore := r.createStorageInLeafCluster(ctx, lr, utils.GVR_SECRET, imagePullSecrets, basicPod, clusterNodeInfo); errignore != nil { + klog.Warning(errignore) + } + return true, nil + }); err != nil { + ch <- fmt.Sprintf("create secrets failed: %v", err) + } + ch <- "" + }() + + t1 := <-ch + t2 := <-ch + t3 := <-ch + + errString := "" + errs := []string{t1, t2, t3} + for i := range errs { + if len(errs[i]) > 0 { + errString = errString + errs[i] + } + } + + if len(errString) > 0 { + return fmt.Errorf("%s", errString) + } + + return nil +} + func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod) error { if err := podutils.PopulateEnvironmentVariables(ctx, pod, r.envResourceManager); err != nil { // span.SetStatus(err) @@ -631,7 +712,7 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf } basicPod := podutils.FitPod(pod, lr.IgnoreLabels, clusterNodeInfo.LeafMode == leafUtils.ALL) - klog.V(5).Infof("Creating pod %v/%+v", pod.Namespace, pod.Name) + klog.V(4).Infof("Creating pod %v/%+v", pod.Namespace, pod.Name) // create ns ns := &corev1.Namespace{} @@ -643,7 +724,7 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf // cannot get ns in root cluster, retry return err } - klog.V(5).Infof("Namespace %s does not exist for pod %s, creating it", basicPod.Namespace, basicPod.Name) + klog.V(4).Infof("Namespace %s does not exist for pod %s, creating it", basicPod.Namespace, basicPod.Name) ns := &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: basicPod.Namespace, @@ -652,55 +733,20 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf if createErr := lr.Client.Create(ctx, ns); createErr != nil { if !errors.IsAlreadyExists(createErr) { - klog.V(5).Infof("Namespace %s create failed error: %v", basicPod.Namespace, createErr) + klog.V(4).Infof("Namespace %s create failed error: %v", basicPod.Namespace, createErr) return err } else { // namespace already existed, skip create - klog.V(5).Info("Namespace %s already existed: %v", basicPod.Namespace, createErr) + klog.V(4).Info("Namespace %s already existed: %v", basicPod.Namespace, createErr) } } } - // create secret configmap pvc - secretNames, imagePullSecrets := podutils.GetSecrets(basicPod) - configMaps := podutils.GetConfigmaps(basicPod) - pvcs := podutils.GetPVCs(basicPod) - // nolint:errcheck - go wait.PollImmediate(500*time.Millisecond, 10*time.Minute, func() (bool, error) { - klog.V(5).Info("Trying to creating base dependent") - if err := r.createStorageInLeafCluster(ctx, lr, utils.GVR_CONFIGMAP, configMaps, basicPod, clusterNodeInfo); err != nil { - klog.Error(err) - return false, nil - } - klog.V(5).Infof("Create configmaps %v of %v/%v success", configMaps, basicPod.Namespace, basicPod.Name) - - if !r.Options.OnewayStorageControllers { - if err := r.createStorageInLeafCluster(ctx, lr, utils.GVR_PVC, pvcs, basicPod, clusterNodeInfo); err != nil { - klog.Error(err) - return false, nil - } - klog.V(5).Infof("Create pvc %v of %v/%v success", pvcs, basicPod.Namespace, basicPod.Name) - } - return true, nil - }) - var err error - // nolint:errcheck - wait.PollImmediate(100*time.Millisecond, 1*time.Second, func() (bool, error) { - klog.V(5).Info("Trying to creating secret and service account") - - if err = r.createStorageInLeafCluster(ctx, lr, utils.GVR_SECRET, secretNames, basicPod, clusterNodeInfo); err != nil { - klog.Error(err) - return false, nil - } - - // try to create image pull secrets, ignore err - if errignore := r.createStorageInLeafCluster(ctx, lr, utils.GVR_SECRET, imagePullSecrets, basicPod, clusterNodeInfo); errignore != nil { - klog.Warning(errignore) - } - return true, nil - }) - if err != nil { - return fmt.Errorf("create secrets failed: %v", err) + if err := r.createVolumes(ctx, lr, basicPod, clusterNodeInfo); err != nil { + klog.Errorf("Creating Volumes error %+v", basicPod) + return err + } else { + klog.V(4).Infof("Creating Volumes successed %+v", basicPod) } r.convertAuth(ctx, lr, basicPod) @@ -709,23 +755,23 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf r.changeToMasterCoreDNS(ctx, basicPod, r.Options) } - klog.V(5).Infof("Creating pod %+v", basicPod) + klog.V(4).Infof("Creating pod %+v", basicPod) - err = lr.Client.Create(ctx, basicPod) + err := lr.Client.Create(ctx, basicPod) if err != nil { return fmt.Errorf("could not create pod: %v", err) } - klog.V(5).Infof("Create pod %v/%+v success", basicPod.Namespace, basicPod.Name) + klog.V(4).Infof("Create pod %v/%+v success", basicPod.Namespace, basicPod.Name) return nil } func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, rootpod *corev1.Pod, leafpod *corev1.Pod) error { // TODO: update env // TODOļ¼š update config secret pv pvc ... - klog.V(5).Infof("Updating pod %v/%+v", rootpod.Namespace, rootpod.Name) + klog.V(4).Infof("Updating pod %v/%+v", rootpod.Namespace, rootpod.Name) if !podutils.IsKosmosPod(leafpod) { - klog.V(5).Info("Pod is not created by kosmos tree, ignore") + klog.V(4).Info("Pod is not created by kosmos tree, ignore") return nil } // not used @@ -746,18 +792,18 @@ func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leaf r.changeToMasterCoreDNS(ctx, podCopy, r.Options) } - klog.V(5).Infof("Updating pod %+v", podCopy) + klog.V(4).Infof("Updating pod %+v", podCopy) err := lr.Client.Update(ctx, podCopy) if err != nil { return fmt.Errorf("could not update pod: %v", err) } - klog.V(5).Infof("Update pod %v/%+v success ", rootpod.Namespace, rootpod.Name) + klog.V(4).Infof("Update pod %v/%+v success ", rootpod.Namespace, rootpod.Name) return nil } func (r *RootPodReconciler) DeletePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, rootnamespacedname types.NamespacedName) error { - klog.V(5).Infof("Deleting pod %v/%+v", rootnamespacedname.Namespace, rootnamespacedname.Name) + klog.V(4).Infof("Deleting pod %v/%+v", rootnamespacedname.Namespace, rootnamespacedname.Name) leafPod := &corev1.Pod{} err := lr.Client.Get(ctx, rootnamespacedname, leafPod) @@ -770,7 +816,7 @@ func (r *RootPodReconciler) DeletePodInLeafCluster(ctx context.Context, lr *leaf } if !podutils.IsKosmosPod(leafPod) { - klog.V(5).Info("Pod is not create by kosmos tree, ignore") + klog.V(4).Info("Pod is not create by kosmos tree, ignore") return nil } @@ -778,11 +824,11 @@ func (r *RootPodReconciler) DeletePodInLeafCluster(ctx context.Context, lr *leaf err = lr.Client.Delete(ctx, leafPod, deleteOption) if err != nil { if errors.IsNotFound(err) { - klog.V(5).Infof("Tried to delete pod %s/%s, but it did not exist in the cluster", leafPod.Namespace, leafPod.Name) + klog.V(4).Infof("Tried to delete pod %s/%s, but it did not exist in the cluster", leafPod.Namespace, leafPod.Name) return nil } return fmt.Errorf("could not delete pod: %v", err) } - klog.V(5).Infof("Delete pod %v/%+v success", leafPod.Namespace, leafPod.Name) + klog.V(4).Infof("Delete pod %v/%+v success", leafPod.Namespace, leafPod.Name) return nil } diff --git a/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go b/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go index 3dca241ed..f41ab3406 100644 --- a/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pv/leaf_pv_controller.go @@ -102,7 +102,7 @@ func (l *LeafPVController) Reconcile(ctx context.Context, request reconcile.Requ rootPV.Spec.ClaimRef.UID = rootPVC.UID rootPV.Spec.ClaimRef.ResourceVersion = rootPVC.ResourceVersion - utils.AddResourceOwnersAnnotations(rootPV.Annotations, l.ClusterName) + utils.AddResourceClusters(rootPV.Annotations, l.ClusterName) rootPV, err = l.RootClientSet.CoreV1().PersistentVolumes().Create(ctx, rootPV, metav1.CreateOptions{}) if err != nil || rootPV == nil { @@ -113,7 +113,7 @@ func (l *LeafPVController) Reconcile(ctx context.Context, request reconcile.Requ return reconcile.Result{}, nil } - if !utils.HasResourceOwnersAnnotations(rootPV.Annotations, l.ClusterName) { + if !utils.HasResourceClusters(rootPV.Annotations, l.ClusterName) { klog.Errorf("meet the same name root pv name: %q !", request.NamespacedName.Name) return reconcile.Result{}, nil } @@ -152,7 +152,7 @@ func (l *LeafPVController) Reconcile(ctx context.Context, request reconcile.Requ pvCopy.Spec.NodeAffinity = rootPV.Spec.NodeAffinity pvCopy.UID = rootPV.UID pvCopy.ResourceVersion = rootPV.ResourceVersion - utils.AddResourceOwnersAnnotations(pvCopy.Annotations, l.ClusterName) + utils.AddResourceClusters(pvCopy.Annotations, l.ClusterName) if utils.IsPVEqual(rootPV, pvCopy) { return reconcile.Result{}, nil diff --git a/pkg/clustertree/cluster-manager/controllers/pv/oneway_pv_controller.go b/pkg/clustertree/cluster-manager/controllers/pv/oneway_pv_controller.go index 30b18454d..7b0967cb9 100644 --- a/pkg/clustertree/cluster-manager/controllers/pv/oneway_pv_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pv/oneway_pv_controller.go @@ -162,7 +162,7 @@ func (c *OnewayPVController) ensureLeafPV(ctx context.Context, leaf *leafUtils.L newPV.Spec.ClaimRef.UID = pvc.UID anno := newPV.GetAnnotations() - anno = utils.AddResourceOwnersAnnotations(anno, leaf.ClusterName) + anno = utils.AddResourceClusters(anno, leaf.ClusterName) anno[utils.KosmosGlobalLabel] = "true" newPV.SetAnnotations(anno) diff --git a/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go b/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go index d7d2f2045..0936497e7 100644 --- a/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go @@ -52,7 +52,7 @@ func (r *RootPVController) SetupWithManager(mgr manager.Manager) error { } pv := deleteEvent.Object.(*v1.PersistentVolume) - clusters := utils.ListResourceOwnersAnnotations(pv.Annotations) + clusters := utils.ListResourceClusters(pv.Annotations) if len(clusters) == 0 { klog.Warningf("pv leaf %q doesn't existed", deleteEvent.Object.GetName()) return false diff --git a/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go b/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go index beb37fce1..821ee7687 100644 --- a/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go @@ -96,7 +96,7 @@ func (l *LeafPVCController) Reconcile(ctx context.Context, request reconcile.Req delete(pvcCopy.Annotations, utils.PVCSelectedNodeKey) pvcCopy.ResourceVersion = rootPVC.ResourceVersion pvcCopy.OwnerReferences = rootPVC.OwnerReferences - utils.AddResourceOwnersAnnotations(pvcCopy.Annotations, l.ClusterName) + utils.AddResourceClusters(pvcCopy.Annotations, l.ClusterName) pvcCopy.Spec = rootPVC.Spec klog.V(4).Infof("rootPVC %+v\n, pvc %+v", rootPVC, pvcCopy) diff --git a/pkg/clustertree/cluster-manager/controllers/pvc/oneway_pvc_controller.go b/pkg/clustertree/cluster-manager/controllers/pvc/oneway_pvc_controller.go index 0a64b4b1e..fe8e7d654 100644 --- a/pkg/clustertree/cluster-manager/controllers/pvc/oneway_pvc_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pvc/oneway_pvc_controller.go @@ -153,7 +153,7 @@ func (c *OnewayPVCController) ensureLeafPVC(ctx context.Context, leaf *leafUtils newPVC := pvc.DeepCopy() anno := newPVC.GetAnnotations() - anno = utils.AddResourceOwnersAnnotations(anno, leaf.ClusterName) + anno = utils.AddResourceClusters(anno, leaf.ClusterName) anno[utils.KosmosGlobalLabel] = "true" newPVC.SetAnnotations(anno) diff --git a/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go b/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go index bd7d1ae72..eba645ceb 100644 --- a/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go @@ -44,7 +44,7 @@ func (r *RootPVCController) Reconcile(ctx context.Context, request reconcile.Req return reconcile.Result{}, nil } - clusters := utils.ListResourceOwnersAnnotations(pvc.Annotations) + clusters := utils.ListResourceClusters(pvc.Annotations) if len(clusters) == 0 { klog.V(4).Infof("pvc leaf %q: %q doesn't existed", request.NamespacedName.Namespace, request.NamespacedName.Name) return reconcile.Result{RequeueAfter: RootPVCRequeueTime}, nil @@ -116,7 +116,7 @@ func (r *RootPVCController) SetupWithManager(mgr manager.Manager) error { } pvc := deleteEvent.Object.(*v1.PersistentVolumeClaim) - clusters := utils.ListResourceOwnersAnnotations(pvc.Annotations) + clusters := utils.ListResourceClusters(pvc.Annotations) if len(clusters) == 0 { klog.V(4).Infof("pvc leaf %q: %q doesn't existed", deleteEvent.Object.GetNamespace(), deleteEvent.Object.GetName()) return false diff --git a/pkg/clustertree/cluster-manager/node-server/server.go b/pkg/clustertree/cluster-manager/node-server/server.go index 450ebdb35..6b2d00fae 100644 --- a/pkg/clustertree/cluster-manager/node-server/server.go +++ b/pkg/clustertree/cluster-manager/node-server/server.go @@ -59,9 +59,9 @@ func (n *NodeServer) getClient(ctx context.Context, namespace string, podName st return nil, nil, err } - nodename := rootPod.Spec.NodeName + nodeName := rootPod.Spec.NodeName - lr, err := n.GlobalLeafManager.GetLeafResourceByNodeName(nodename) + lr, err := n.GlobalLeafManager.GetLeafResourceByNodeName(nodeName) if err != nil { return nil, nil, err } diff --git a/pkg/clustertree/cluster-manager/utils/leaf_model_handler.go b/pkg/clustertree/cluster-manager/utils/leaf_model_handler.go index a2695692f..8938ada02 100644 --- a/pkg/clustertree/cluster-manager/utils/leaf_model_handler.go +++ b/pkg/clustertree/cluster-manager/utils/leaf_model_handler.go @@ -21,9 +21,6 @@ type LeafModelHandler interface { // GetLeafModelType returns the leafModelType for a Cluster GetLeafModelType() LeafModelType - // GetGlobalLeafManagerClusterName returns the clusterName for a Cluster's GlobalLeafManager - GetGlobalLeafManagerClusterName(cluster *kosmosv1alpha1.Cluster) string - // GetLeafNodes returns nodes in leaf cluster by the rootNode GetLeafNodes(ctx context.Context, rootNode *corev1.Node) (*corev1.NodeList, error) @@ -138,12 +135,6 @@ func (h AggregationModelHandler) GetLeafNodes(ctx context.Context, _ *corev1.Nod return nodesInLeaf, nil } -// GetGlobalLeafManagerClusterName returns the clusterName for a Cluster's GlobalLeafManager -func (h AggregationModelHandler) GetGlobalLeafManagerClusterName(cluster *kosmosv1alpha1.Cluster) string { - clusterName := fmt.Sprintf("%s%s", utils.KosmosNodePrefix, cluster.Name) - return clusterName -} - // GetLeafModelType returns the leafModelType for a Cluster func (h AggregationModelHandler) GetLeafModelType() LeafModelType { return AggregationModel @@ -252,10 +243,6 @@ func (h DispersionModelHandler) GetLeafNodes(ctx context.Context, rootNode *core return nodesInLeaf, nil } -func (h DispersionModelHandler) GetGlobalLeafManagerClusterName(cluster *kosmosv1alpha1.Cluster) string { - return cluster.Name -} - func (h DispersionModelHandler) GetLeafModelType() LeafModelType { return DispersionModel } diff --git a/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go b/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go index 52db1ec2e..5c40a96d0 100644 --- a/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go +++ b/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go @@ -2,6 +2,7 @@ package utils import ( "fmt" + "strings" "sync" corev1 "k8s.io/api/core/v1" @@ -47,17 +48,22 @@ type LeafResource struct { } type LeafResourceManager interface { - AddLeafResource(string, *LeafResource, []kosmosv1alpha1.LeafModel, []*corev1.Node) - RemoveLeafResource(string) + AddLeafResource(lr *LeafResource, cluster *kosmosv1alpha1.Cluster, node []*corev1.Node) + RemoveLeafResource(clusterName string) // get leafresource by cluster name - GetLeafResource(string) (*LeafResource, error) - // get leafresource by knode name - GetLeafResourceByNodeName(string) (*LeafResource, error) - // judge if the map has leafresource of nodename - Has(string) bool - HasNodeName(string) bool - ListNodeNames() []string - GetClusterNode(string) *ClusterNode + GetLeafResource(clusterName string) (*LeafResource, error) + // get leafresource by node name + GetLeafResourceByNodeName(nodeName string) (*LeafResource, error) + // determine if the cluster is present in the map + HasCluster(clusterName string) bool + // determine if the node is present in the map + HasNode(nodeName string) bool + // list all all node name + ListNodes() []string + // list all all cluster name + ListClusters() []string + // get ClusterNode(struct) by node name + GetClusterNode(nodeName string) *ClusterNode } type leafResourceManager struct { @@ -65,11 +71,8 @@ type leafResourceManager struct { leafResourceManagersLock sync.Mutex } -func GetLeafResourceClusterName(cluster *kosmosv1alpha1.Cluster) string { - if cluster.Spec.ClusterTreeOptions.LeafModels != nil { - return cluster.Name - } - return fmt.Sprintf("%s%s", utils.KosmosNodePrefix, cluster.Name) +func trimNamePrefix(name string) string { + return strings.TrimPrefix(name, utils.KosmosNodePrefix) } func has(clusternodes []ClusterNode, target string) bool { @@ -90,9 +93,14 @@ func getClusterNode(clusternodes []ClusterNode, target string) *ClusterNode { return nil } -func (l *leafResourceManager) AddLeafResource(clustername string, lptr *LeafResource, leafModels []kosmosv1alpha1.LeafModel, nodes []*corev1.Node) { +func (l *leafResourceManager) AddLeafResource(lptr *LeafResource, cluster *kosmosv1alpha1.Cluster, nodes []*corev1.Node) { l.leafResourceManagersLock.Lock() defer l.leafResourceManagersLock.Unlock() + + clusterName := cluster.Name + + leafModels := cluster.Spec.ClusterTreeOptions.LeafModels + clusterNodes := []ClusterNode{} for i, n := range nodes { if leafModels != nil && len(leafModels[i].NodeSelector.NodeName) > 0 { @@ -101,50 +109,51 @@ func (l *leafResourceManager) AddLeafResource(clustername string, lptr *LeafReso LeafMode: Node, }) // } else if leafModels != nil && leafModels[i].NodeSelector.LabelSelector != nil { - // // TODO: + // TODO: support labelselector } else { clusterNodes = append(clusterNodes, ClusterNode{ - NodeName: n.Name, + NodeName: trimNamePrefix(n.Name), LeafMode: ALL, }) } } lptr.Nodes = clusterNodes - l.resourceMap[clustername] = lptr + l.resourceMap[clusterName] = lptr } -func (l *leafResourceManager) RemoveLeafResource(clustername string) { +func (l *leafResourceManager) RemoveLeafResource(clusterName string) { l.leafResourceManagersLock.Lock() defer l.leafResourceManagersLock.Unlock() - delete(l.resourceMap, clustername) + delete(l.resourceMap, clusterName) } -func (l *leafResourceManager) GetLeafResource(clustername string) (*LeafResource, error) { +func (l *leafResourceManager) GetLeafResource(clusterName string) (*LeafResource, error) { l.leafResourceManagersLock.Lock() defer l.leafResourceManagersLock.Unlock() - if m, ok := l.resourceMap[clustername]; ok { + if m, ok := l.resourceMap[clusterName]; ok { return m, nil } else { - return nil, fmt.Errorf("cannot get leaf resource, clustername: %s", clustername) + return nil, fmt.Errorf("cannot get leaf resource, clusterName: %s", clusterName) } } -func (l *leafResourceManager) GetLeafResourceByNodeName(nodename string) (*LeafResource, error) { +func (l *leafResourceManager) GetLeafResourceByNodeName(nodeName string) (*LeafResource, error) { l.leafResourceManagersLock.Lock() defer l.leafResourceManagersLock.Unlock() - + nodeName = trimNamePrefix(nodeName) for k := range l.resourceMap { - if has(l.resourceMap[k].Nodes, nodename) { + if has(l.resourceMap[k].Nodes, nodeName) { return l.resourceMap[k], nil } } - return nil, fmt.Errorf("cannot get leaf resource, nodename: %s", nodename) + return nil, fmt.Errorf("cannot get leaf resource, nodeName: %s", nodeName) } -func (l *leafResourceManager) HasNodeName(nodename string) bool { +func (l *leafResourceManager) HasNode(nodeName string) bool { + nodeName = trimNamePrefix(nodeName) for k := range l.resourceMap { - if has(l.resourceMap[k].Nodes, nodename) { + if has(l.resourceMap[k].Nodes, nodeName) { return true } } @@ -152,7 +161,7 @@ func (l *leafResourceManager) HasNodeName(nodename string) bool { return false } -func (l *leafResourceManager) Has(clustername string) bool { +func (l *leafResourceManager) HasCluster(clustername string) bool { for k := range l.resourceMap { if k == clustername { return true @@ -162,19 +171,34 @@ func (l *leafResourceManager) Has(clustername string) bool { return false } -func (l *leafResourceManager) GetClusterNode(nodename string) *ClusterNode { +func (l *leafResourceManager) GetClusterNode(nodeName string) *ClusterNode { + nodeName = trimNamePrefix(nodeName) for k := range l.resourceMap { - if clusterNode := getClusterNode(l.resourceMap[k].Nodes, nodename); clusterNode != nil { + if clusterNode := getClusterNode(l.resourceMap[k].Nodes, nodeName); clusterNode != nil { return clusterNode } } return nil } -func (l *leafResourceManager) ListNodeNames() []string { +func (l *leafResourceManager) ListClusters() []string { + l.leafResourceManagersLock.Lock() + defer l.leafResourceManagersLock.Unlock() + keys := make([]string, 0) + for k := range l.resourceMap { + if len(k) == 0 { + continue + } + + keys = append(keys, k) + } + return keys +} + +func (l *leafResourceManager) ListNodes() []string { l.leafResourceManagersLock.Lock() defer l.leafResourceManagersLock.Unlock() - keys := make([]string, 0, len(l.resourceMap)) + keys := make([]string, 0) for k := range l.resourceMap { if len(k) == 0 { continue diff --git a/pkg/utils/constants.go b/pkg/utils/constants.go index 603a0e550..37ff95e98 100644 --- a/pkg/utils/constants.go +++ b/pkg/utils/constants.go @@ -75,7 +75,9 @@ const ( KosmosTrippedLabels = "kosmos-io/tripped" KosmosPvcLabelSelector = "kosmos-io/label-selector" - KosmosResourceOwnersAnnotations = "kosmos-io/cluster-owners" + // on resorce (pv, configmap, secret), represents which cluster this resource belongs to + KosmosResourceOwnersAnnotations = "kosmos-io/cluster-owners" + // on node, represents which cluster this node belongs to KosmosNodeOwnedByClusterAnnotations = "kosmos-io/owned-by-cluster" KosmosDaemonsetAllowAnnotations = "kosmos-io/daemonset-allow" diff --git a/pkg/utils/k8s.go b/pkg/utils/k8s.go index 7a6be8baf..168a4918d 100644 --- a/pkg/utils/k8s.go +++ b/pkg/utils/k8s.go @@ -282,7 +282,7 @@ func IsObjectUnstructuredGlobal(obj map[string]string) bool { return false } -func AddResourceOwnersAnnotations(anno map[string]string, owner string) map[string]string { +func AddResourceClusters(anno map[string]string, clusterName string) map[string]string { if anno == nil { anno = map[string]string{} } @@ -295,28 +295,28 @@ func AddResourceOwnersAnnotations(anno map[string]string, owner string) map[stri continue } newowners = append(newowners, v) - if v == owner { + if v == clusterName { // already existed flag = true } } if !flag { - newowners = append(newowners, owner) + newowners = append(newowners, clusterName) } anno[KosmosResourceOwnersAnnotations] = strings.Join(newowners, ",") return anno } -func HasResourceOwnersAnnotations(anno map[string]string, owner string) bool { +func HasResourceClusters(anno map[string]string, clusterName string) bool { if anno == nil { anno = map[string]string{} } owners := strings.Split(anno[KosmosResourceOwnersAnnotations], ",") for _, v := range owners { - if v == owner { + if v == clusterName { // already existed return true } @@ -324,7 +324,7 @@ func HasResourceOwnersAnnotations(anno map[string]string, owner string) bool { return false } -func ListResourceOwnersAnnotations(anno map[string]string) []string { +func ListResourceClusters(anno map[string]string) []string { if anno == nil || anno[KosmosResourceOwnersAnnotations] == "" { return []string{} }