From b63545d4b5d8962ae28df66f9006c852de2417d3 Mon Sep 17 00:00:00 2001 From: duanmengkk Date: Sun, 11 Aug 2024 22:32:53 +0800 Subject: [PATCH] support multi cluster for one kubernetes Signed-off-by: duanmengkk --- .../cluster-manager/app/manager.go | 75 +++++++----- .../cluster-manager/cluster_controller.go | 27 +++-- .../controllers/common_controller.go | 23 +++- .../controllers/mcs/auto_mcs_controller.go | 20 ++-- .../controllers/pod/root_pod_controller.go | 109 ++++++++++++++---- .../controllers/pv/oneway_pv_controller.go | 54 ++++++--- .../controllers/pv/root_pv_controller.go | 31 ++++- .../controllers/pvc/oneway_pvc_controller.go | 45 +++++--- .../controllers/pvc/root_pvc_controller.go | 34 +++++- .../cluster-manager/node-server/server.go | 19 ++- .../utils/leaf_client_resource_manager.go | 92 +++++++++++++++ .../utils/leaf_resource_manager.go | 20 +--- pkg/utils/constants.go | 33 +++--- 13 files changed, 429 insertions(+), 153 deletions(-) create mode 100644 pkg/clustertree/cluster-manager/utils/leaf_client_resource_manager.go diff --git a/cmd/clustertree/cluster-manager/app/manager.go b/cmd/clustertree/cluster-manager/app/manager.go index 78ddea2ed..a3e89bb3f 100644 --- a/cmd/clustertree/cluster-manager/app/manager.go +++ b/cmd/clustertree/cluster-manager/app/manager.go @@ -118,7 +118,8 @@ func leaderElectionRun(ctx context.Context, opts *options.Options) error { } func run(ctx context.Context, opts *options.Options) error { - globalleafManager := leafUtils.GetGlobalLeafResourceManager() + globalLeafResourceManager := leafUtils.GetGlobalLeafResourceManager() + globalLeafClientManager := leafUtils.GetGlobalLeafClientResourceManager() config, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.Master, opts.KubernetesOptions.KubeConfig) if err != nil { @@ -163,13 +164,14 @@ func run(ctx context.Context, opts *options.Options) error { // add cluster controller clusterController := clusterManager.ClusterController{ - Root: mgr.GetClient(), - RootDynamic: dynamicClient, - RootClientset: rootClient, - EventRecorder: mgr.GetEventRecorderFor(clusterManager.ControllerName), - Options: opts, - RootResourceManager: rootResourceManager, - GlobalLeafManager: globalleafManager, + Root: mgr.GetClient(), + RootDynamic: dynamicClient, + RootClientset: rootClient, + EventRecorder: mgr.GetEventRecorderFor(clusterManager.ControllerName), + Options: opts, + RootResourceManager: rootResourceManager, + GlobalLeafResourceManager: globalLeafResourceManager, + GlobalLeafClientManager: globalLeafClientManager, } if err = clusterController.SetupWithManager(mgr); err != nil { return fmt.Errorf("error starting %s: %v", clusterManager.ControllerName, err) @@ -191,13 +193,14 @@ func run(ctx context.Context, opts *options.Options) error { // add auto create mcs resources controller autoCreateMCSController := mcs.AutoCreateMCSController{ - RootClient: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(mcs.AutoCreateMCSControllerName), - Logger: mgr.GetLogger(), - AutoCreateMCSPrefix: opts.AutoCreateMCSPrefix, - RootKosmosClient: rootKosmosClient, - GlobalLeafManager: globalleafManager, - ReservedNamespaces: opts.ReservedNamespaces, + RootClient: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor(mcs.AutoCreateMCSControllerName), + Logger: mgr.GetLogger(), + AutoCreateMCSPrefix: opts.AutoCreateMCSPrefix, + RootKosmosClient: rootKosmosClient, + GlobalLeafManager: globalLeafResourceManager, + GlobalLeafClientManager: globalLeafClientManager, + ReservedNamespaces: opts.ReservedNamespaces, } if err = autoCreateMCSController.SetupWithManager(mgr); err != nil { return fmt.Errorf("error starting %s: %v", mcs.AutoCreateMCSControllerName, err) @@ -217,7 +220,9 @@ func run(ctx context.Context, opts *options.Options) error { // init rootPodController rootPodReconciler := podcontrollers.RootPodReconciler{ - GlobalLeafManager: globalleafManager, + GlobalLeafManager: globalLeafResourceManager, + GlobalLeafClientManager: globalLeafClientManager, + RootClient: mgr.GetClient(), DynamicRootClient: dynamicClient, Options: opts, @@ -227,16 +232,18 @@ func run(ctx context.Context, opts *options.Options) error { } rootPVCController := pvc.RootPVCController{ - RootClient: mgr.GetClient(), - GlobalLeafManager: globalleafManager, + RootClient: mgr.GetClient(), + GlobalLeafManager: globalLeafResourceManager, + GlobalLeafClientManager: globalLeafClientManager, } if err := rootPVCController.SetupWithManager(mgr); err != nil { return fmt.Errorf("error starting root pvc controller %v", err) } rootPVController := pv.RootPVController{ - RootClient: mgr.GetClient(), - GlobalLeafManager: globalleafManager, + RootClient: mgr.GetClient(), + GlobalLeafManager: globalLeafResourceManager, + GlobalLeafClientManager: globalLeafClientManager, } if err := rootPVController.SetupWithManager(mgr); err != nil { return fmt.Errorf("error starting root pv controller %v", err) @@ -244,18 +251,20 @@ func run(ctx context.Context, opts *options.Options) error { if len(os.Getenv("USE-ONEWAY-STORAGE")) > 0 { onewayPVController := pv.OnewayPVController{ - Root: mgr.GetClient(), - RootDynamic: dynamicClient, - GlobalLeafManager: globalleafManager, + Root: mgr.GetClient(), + RootDynamic: dynamicClient, + GlobalLeafManager: globalLeafResourceManager, + GlobalLeafClientManager: globalLeafClientManager, } if err := onewayPVController.SetupWithManager(mgr); err != nil { return fmt.Errorf("error starting oneway pv controller %v", err) } onewayPVCController := pvc.OnewayPVCController{ - Root: mgr.GetClient(), - RootDynamic: dynamicClient, - GlobalLeafManager: globalleafManager, + Root: mgr.GetClient(), + RootDynamic: dynamicClient, + GlobalLeafManager: globalLeafResourceManager, + GlobalLeafClientManager: globalLeafClientManager, } if err := onewayPVCController.SetupWithManager(mgr); err != nil { return fmt.Errorf("error starting oneway pvc controller %v", err) @@ -265,10 +274,11 @@ func run(ctx context.Context, opts *options.Options) error { // init commonController for i, gvr := range controllers.SYNC_GVRS { commonController := controllers.SyncResourcesReconciler{ - GlobalLeafManager: globalleafManager, - GroupVersionResource: gvr, - Object: controllers.SYNC_OBJS[i], - DynamicRootClient: dynamicClient, + GlobalLeafManager: globalLeafResourceManager, + GlobalLeafClientManager: globalLeafClientManager, + GroupVersionResource: gvr, + Object: controllers.SYNC_OBJS[i], + DynamicRootClient: dynamicClient, // DynamicLeafClient: clientDynamic, ControllerName: "async-controller-" + gvr.Resource, // Namespace: cluster.Spec.Namespace, @@ -286,8 +296,9 @@ func run(ctx context.Context, opts *options.Options) error { }() nodeServer := nodeserver.NodeServer{ - RootClient: mgr.GetClient(), - GlobalLeafManager: globalleafManager, + RootClient: mgr.GetClient(), + GlobalLeafManager: globalLeafResourceManager, + GlobalLeafClientManager: globalLeafClientManager, } go func() { if err := nodeServer.Start(ctx, opts); err != nil { diff --git a/pkg/clustertree/cluster-manager/cluster_controller.go b/pkg/clustertree/cluster-manager/cluster_controller.go index f294e817f..b430516a7 100644 --- a/pkg/clustertree/cluster-manager/cluster_controller.go +++ b/pkg/clustertree/cluster-manager/cluster_controller.go @@ -62,7 +62,9 @@ type ClusterController struct { RootResourceManager *utils.ResourceManager - GlobalLeafManager leafUtils.LeafResourceManager + GlobalLeafResourceManager leafUtils.LeafResourceManager + + GlobalLeafClientManager leafUtils.LeafClientResourceManager LeafModelHandler leafUtils.LeafModelHandler } @@ -232,7 +234,7 @@ func (c *ClusterController) clearClusterControllers(cluster *kosmosv1alpha1.Clus delete(c.ManagerCancelFuncs, cluster.Name) delete(c.ControllerManagers, cluster.Name) - c.GlobalLeafManager.RemoveLeafResource(cluster.Name) + c.GlobalLeafResourceManager.RemoveLeafResource(cluster.Name) } func (c *ClusterController) setupControllers( @@ -244,22 +246,25 @@ func (c *ClusterController) setupControllers( leafClientset kubernetes.Interface, leafKosmosClient kosmosversioned.Interface, leafRestConfig *rest.Config) error { - c.GlobalLeafManager.AddLeafResource(&leafUtils.LeafResource{ - Client: mgr.GetClient(), - DynamicClient: clientDynamic, - Clientset: leafClientset, - KosmosClient: leafKosmosClient, - ClusterName: cluster.Name, + c.GlobalLeafResourceManager.AddLeafResource(&leafUtils.LeafResource{ + Cluster: cluster, // TODO: define node options Namespace: "", IgnoreLabels: strings.Split("", ","), EnableServiceAccount: true, - RestConfig: leafRestConfig, - }, cluster, nodes) + }, nodes) + + c.GlobalLeafClientManager.AddLeafClientResource(&leafUtils.LeafClientResource{ + Client: mgr.GetClient(), + DynamicClient: clientDynamic, + Clientset: leafClientset, + KosmosClient: leafKosmosClient, + RestConfig: leafRestConfig, + }, cluster) nodeResourcesController := controllers.NodeResourcesController{ Leaf: mgr.GetClient(), - GlobalLeafManager: c.GlobalLeafManager, + GlobalLeafManager: c.GlobalLeafResourceManager, Root: c.Root, RootClientset: c.RootClientset, Nodes: nodes, diff --git a/pkg/clustertree/cluster-manager/controllers/common_controller.go b/pkg/clustertree/cluster-manager/controllers/common_controller.go index 3eba7d8af..adaa8c651 100644 --- a/pkg/clustertree/cluster-manager/controllers/common_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/common_controller.go @@ -2,6 +2,7 @@ package controllers import ( "context" + "fmt" "time" corev1 "k8s.io/api/core/v1" @@ -40,7 +41,8 @@ type SyncResourcesReconciler struct { client.Client - GlobalLeafManager leafUtils.LeafResourceManager + GlobalLeafManager leafUtils.LeafResourceManager + GlobalLeafClientManager leafUtils.LeafClientResourceManager } func (r *SyncResourcesReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { @@ -65,7 +67,13 @@ func (r *SyncResourcesReconciler) Reconcile(ctx context.Context, request reconci klog.Errorf("get lr(cluster: %s) err: %v", cluster, err) return reconcile.Result{RequeueAfter: SyncResourcesRequeueTime}, nil } - if err = r.SyncResource(ctx, request, lr); err != nil { + lcr, err := r.leafClientResource(lr) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", lr.Cluster.Name) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + + if err = r.SyncResource(ctx, request, lcr); err != nil { klog.Errorf("sync resource %s error: %v", request.NamespacedName, err) return reconcile.Result{RequeueAfter: SyncResourcesRequeueTime}, nil } @@ -114,7 +122,7 @@ func (r *SyncResourcesReconciler) SetupWithManager(mgr manager.Manager, gvr sche return nil } -func (r *SyncResourcesReconciler) SyncResource(ctx context.Context, request reconcile.Request, lr *leafUtils.LeafResource) error { +func (r *SyncResourcesReconciler) SyncResource(ctx context.Context, request reconcile.Request, lr *leafUtils.LeafClientResource) error { klog.V(4).Infof("Started sync resource processing, ns: %s, name: %s", request.Namespace, request.Name) deleteSecretInClient := false @@ -191,3 +199,12 @@ func (r *SyncResourcesReconciler) SyncResource(ctx context.Context, request reco } return nil } + +func (r *SyncResourcesReconciler) leafClientResource(lr *leafUtils.LeafResource) (*leafUtils.LeafClientResource, error) { + actualClusterName := leafUtils.GetActualClusterName(lr.Cluster) + lcr, err := r.GlobalLeafClientManager.GetLeafResource(actualClusterName) + if err != nil { + return nil, fmt.Errorf("get leaf client resource err: %v", err) + } + return lcr, nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go b/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go index 0ef3e445c..7caa9a9a2 100644 --- a/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go @@ -34,11 +34,12 @@ const AutoCreateMCSControllerName = "auto-mcs-controller" // AutoCreateMCSController watches services in root cluster and auto create serviceExport and serviceImport in leaf cluster type AutoCreateMCSController struct { - RootClient client.Client - RootKosmosClient kosmosversioned.Interface - EventRecorder record.EventRecorder - Logger logr.Logger - GlobalLeafManager clustertreeutils.LeafResourceManager + RootClient client.Client + RootKosmosClient kosmosversioned.Interface + EventRecorder record.EventRecorder + Logger logr.Logger + GlobalLeafManager clustertreeutils.LeafResourceManager + GlobalLeafClientManager clustertreeutils.LeafClientResourceManager // AutoCreateMCSPrefix are the prefix of the namespace for service to auto create in leaf cluster AutoCreateMCSPrefix []string // ReservedNamespaces are the protected namespaces to prevent Kosmos for deleting system resources @@ -225,11 +226,13 @@ func (c *AutoCreateMCSController) cleanUpMcsResources(namespace string, name str continue } - leafManager, err := c.GlobalLeafManager.GetLeafResource(cluster.Name) + actualClusterName := clustertreeutils.GetActualClusterName(newCluster) + leafManager, err := c.GlobalLeafClientManager.GetLeafResource(actualClusterName) if err != nil { klog.Errorf("get leafManager for cluster %s failed,Error: %v", cluster.Name, err) return err } + if err = leafManager.KosmosClient.MulticlusterV1alpha1().ServiceImports(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}); err != nil { if !apierrors.IsNotFound(err) { klog.Errorf("Delete serviceImport in leaf cluster failed %s/%s, Error: %v", namespace, name, err) @@ -262,9 +265,10 @@ func (c *AutoCreateMCSController) autoCreateMcsResources(service *corev1.Service continue } - leafManager, err := c.GlobalLeafManager.GetLeafResource(cluster.Name) + actualClusterName := clustertreeutils.GetActualClusterName(newCluster) + leafManager, err := c.GlobalLeafClientManager.GetLeafResource(actualClusterName) if err != nil { - klog.Errorf("get leafManager for cluster %s failed,Error: %v", cluster.Name, err) + klog.Errorf("get leafClientManager for cluster %s failed,Error: %v", cluster.Name, err) return err } diff --git a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go index eaf33cfd0..efc9ce699 100644 --- a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go @@ -30,6 +30,7 @@ import ( "github.com/kosmos.io/kosmos/cmd/clustertree/cluster-manager/app/options" kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/extensions/daemonset" + clustertreeutils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" "github.com/kosmos.io/kosmos/pkg/utils" "github.com/kosmos.io/kosmos/pkg/utils/convertpolicy" @@ -48,7 +49,8 @@ type RootPodReconciler struct { DynamicRootClient dynamic.Interface envResourceManager utils.EnvResourceManager - GlobalLeafManager leafUtils.LeafResourceManager + GlobalLeafManager leafUtils.LeafResourceManager + GlobalLeafClientManager clustertreeutils.LeafClientResourceManager Options *options.Options } @@ -195,8 +197,14 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req return reconcile.Result{}, nil } + lcr, err := r.leafClientResource(lr) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", lr.Cluster.Name) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + leafPod := &corev1.Pod{} - err = lr.Client.Get(ctx, request.NamespacedName, leafPod) + err = lcr.Client.Get(ctx, request.NamespacedName, leafPod) // create pod in leaf if err != nil { @@ -295,7 +303,7 @@ func (r *RootPodReconciler) createStorageInLeafCluster(ctx context.Context, lr * return fmt.Errorf("could not get resource gvr(%v) %s from root cluster: %v", gvr, rname, err) } rootannotations := rootobj.GetAnnotations() - rootannotations = utils.AddResourceClusters(rootannotations, lr.ClusterName) + rootannotations = utils.AddResourceClusters(rootannotations, lr.Cluster.Name) rootobj.SetAnnotations(rootannotations) @@ -312,7 +320,12 @@ func (r *RootPodReconciler) createStorageInLeafCluster(ctx context.Context, lr * return err } - _, err = lr.DynamicClient.Resource(gvr).Namespace(ns).Get(ctx, unstructuredObj.GetName(), metav1.GetOptions{}) + lcr, err := r.leafClientResource(lr) + if err != nil { + return fmt.Errorf("failed to get leaf client resource: %v", err) + } + + _, err = lcr.DynamicClient.Resource(gvr).Namespace(ns).Get(ctx, unstructuredObj.GetName(), metav1.GetOptions{}) if err == nil { // already existed, so skip continue @@ -323,7 +336,7 @@ func (r *RootPodReconciler) createStorageInLeafCluster(ctx context.Context, lr * return err } - _, err = lr.DynamicClient.Resource(gvr).Namespace(ns).Create(ctx, unstructuredObj, metav1.CreateOptions{}) + _, err = lcr.DynamicClient.Resource(gvr).Namespace(ns).Create(ctx, unstructuredObj, metav1.CreateOptions{}) if err != nil { if errors.IsAlreadyExists(err) { continue @@ -372,8 +385,15 @@ func (r *RootPodReconciler) createSATokenInLeafCluster(ctx context.Context, lr * Namespace: ns, Name: rootSecretName, } + + lcr, err := r.leafClientResource(lr) + if err != nil { + // wait for leaf resource init + return "", fmt.Errorf("failed to get leaf resource: %v", err) + } + clientSecret := &corev1.Secret{} - err = lr.Client.Get(ctx, csKey, clientSecret) + err = lcr.Client.Get(ctx, csKey, clientSecret) if err != nil && !errors.IsNotFound(err) { return "", fmt.Errorf("could not check secret %s in member cluster: %v", csKey.Name, err) } @@ -457,9 +477,15 @@ func (r *RootPodReconciler) createConfigMapInLeafCluster(ctx context.Context, lr Name: memberConfigmapKeyName, } + lcr, err := r.leafClientResource(lr) + if err != nil { + // wait for leaf resource init + return "", fmt.Errorf("failed to get leaf resource: %v", err) + } + memberConfigMap := &corev1.ConfigMap{} - err := lr.Client.Get(ctx, configmapKey, memberConfigMap) + err = lcr.Client.Get(ctx, configmapKey, memberConfigMap) if err != nil && !errors.IsNotFound(err) { return "", fmt.Errorf("could not check configmap %s in member cluster: %v", configmapKey.Name, err) } @@ -498,9 +524,13 @@ func (r *RootPodReconciler) createSecretInLeafCluster(ctx context.Context, lr *l Name: secretName, } - memberSecret := &corev1.Secret{} + lcr, err := r.leafClientResource(lr) + if err != nil { + return "", fmt.Errorf("could not get leaf resource for cluster %s: %v", lr.Cluster.Name, err) + } - err := lr.Client.Get(ctx, secretKey, memberSecret) + memberSecret := &corev1.Secret{} + err = lcr.Client.Get(ctx, secretKey, memberSecret) if err != nil && !errors.IsNotFound(err) { return "", fmt.Errorf("could not check secret %s in member cluster: %v", secretKey.Name, err) } @@ -657,7 +687,13 @@ func (r *RootPodReconciler) createServiceAccountInLeafCluster(ctx context.Contex Name: accountName, } - err := lr.Client.Get(ctx, saKey, sa) + lcr, err := r.leafClientResource(lr) + if err != nil { + klog.Errorf("GetLeafResource err: %v", err) + return err + } + + err = lcr.Client.Get(ctx, saKey, sa) if err != nil || sa == nil { klog.V(4).Infof("Get serviceAccount [%v] err: [%v]]", sa, err) sa = &corev1.ServiceAccount{ @@ -666,7 +702,7 @@ func (r *RootPodReconciler) createServiceAccountInLeafCluster(ctx context.Contex Namespace: ns, }, } - err := lr.Client.Create(ctx, sa) + err := lcr.Client.Create(ctx, sa) klog.Infof("Create serviceAccount [%v] err: [%v]", sa, err) if err != nil { if errors.IsAlreadyExists(err) { @@ -691,8 +727,13 @@ func (r *RootPodReconciler) createServiceAccountInLeafCluster(ctx context.Contex secret.Type = corev1.SecretTypeOpaque } - err := lr.Client.Create(ctx, secret) + lcr, err := r.leafClientResource(lr) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", lr.Cluster.Name) + return err + } + err = lcr.Client.Create(ctx, secret) if err != nil { if errors.IsAlreadyExists(err) { return nil @@ -703,7 +744,7 @@ func (r *RootPodReconciler) createServiceAccountInLeafCluster(ctx context.Contex // the secret-token cannot be mounted to the default-sa of the leaf cluster if accountName != utils.DefaultServiceAccountName { saCopy := sa.DeepCopy() - err := updateServiceAccountObjectReferenceRetry(ctx, saCopy, lr, secret.Name) + err := updateServiceAccountObjectReferenceRetry(ctx, saCopy, lcr, secret.Name) if err != nil { klog.Errorf("update serviceAccount [%v] err: [%v]]", saCopy, err) return err @@ -713,10 +754,20 @@ func (r *RootPodReconciler) createServiceAccountInLeafCluster(ctx context.Contex return nil } +func (r *RootPodReconciler) leafClientResource(lr *leafUtils.LeafResource) (*leafUtils.LeafClientResource, error) { + actualClusterName := leafUtils.GetActualClusterName(lr.Cluster) + lcr, err := r.GlobalLeafClientManager.GetLeafResource(actualClusterName) + if err != nil { + return nil, fmt.Errorf("get leaf client resource err: %v", err) + } + return lcr, nil +} + // nolint:dupl -func updateServiceAccountObjectReferenceRetry(ctx context.Context, saCopy *corev1.ServiceAccount, lr *leafUtils.LeafResource, secretName string) error { +func updateServiceAccountObjectReferenceRetry(ctx context.Context, saCopy *corev1.ServiceAccount, lr *leafUtils.LeafClientResource, secretName string) error { return retry.RetryOnConflict(retry.DefaultRetry, func() error { saCopy.Secrets = []corev1.ObjectReference{{Name: secretName}} + err := lr.Client.Update(ctx, saCopy) if err == nil { return nil @@ -871,12 +922,18 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf klog.Errorf("Converting pod error: %v", err) } + lcr, err := r.leafClientResource(lr) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", lr.Cluster.Name) + return err + } + // create ns ns := &corev1.Namespace{} nsKey := types.NamespacedName{ Name: basicPod.Namespace, } - if err := lr.Client.Get(ctx, nsKey, ns); err != nil { + if err := lcr.Client.Get(ctx, nsKey, ns); err != nil { if !errors.IsNotFound(err) { // cannot get ns in root cluster, retry return err @@ -888,7 +945,7 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf }, } - if createErr := lr.Client.Create(ctx, ns); createErr != nil { + if createErr := lcr.Client.Create(ctx, ns); createErr != nil { if !errors.IsAlreadyExists(createErr) { klog.V(4).Infof("Namespace %s create failed error: %v", basicPod.Namespace, createErr) return err @@ -914,7 +971,7 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf klog.V(4).Infof("Creating pod %+v", basicPod) - err = lr.Client.Create(ctx, basicPod) + err = lcr.Client.Create(ctx, basicPod) if err != nil { return fmt.Errorf("could not create pod: %v", err) } @@ -955,7 +1012,13 @@ func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leaf klog.V(5).Infof("Updating pod %+v", podCopy) - err := lr.Client.Update(ctx, podCopy) + lcr, err := r.leafClientResource(lr) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", lr.Cluster.Name) + return fmt.Errorf("could not get leaf client resource: %v", err) + } + + err = lcr.Client.Update(ctx, podCopy) if err != nil { return fmt.Errorf("could not update pod: %v", err) } @@ -971,7 +1034,13 @@ func (r *RootPodReconciler) DeletePodInLeafCluster(ctx context.Context, lr *leaf return DeletePodInRootCluster(ctx, rootnamespacedname, r.Client) } - err := lr.Client.Get(ctx, rootnamespacedname, leafPod) + lcr, err := r.leafClientResource(lr) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", lr.Cluster.Name) + return fmt.Errorf("could not get leaf client resource: %v", err) + } + + err = lcr.Client.Get(ctx, rootnamespacedname, leafPod) if err != nil { if errors.IsNotFound(err) { @@ -989,7 +1058,7 @@ func (r *RootPodReconciler) DeletePodInLeafCluster(ctx context.Context, lr *leaf } deleteOption := NewLeafDeleteOption(leafPod) - err = lr.Client.Delete(ctx, leafPod, deleteOption) + err = lcr.Client.Delete(ctx, leafPod, deleteOption) if err != nil { if errors.IsNotFound(err) { klog.V(4).Infof("Tried to delete pod %s/%s, but it did not exist in the cluster", leafPod.Namespace, leafPod.Name) diff --git a/pkg/clustertree/cluster-manager/controllers/pv/oneway_pv_controller.go b/pkg/clustertree/cluster-manager/controllers/pv/oneway_pv_controller.go index 10185e3f9..8969594b5 100644 --- a/pkg/clustertree/cluster-manager/controllers/pv/oneway_pv_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pv/oneway_pv_controller.go @@ -2,6 +2,7 @@ package pv import ( "context" + "fmt" "time" corev1 "k8s.io/api/core/v1" @@ -32,9 +33,10 @@ const ( ) type OnewayPVController struct { - Root client.Client - RootDynamic dynamic.Interface - GlobalLeafManager leafUtils.LeafResourceManager + Root client.Client + RootDynamic dynamic.Interface + GlobalLeafManager leafUtils.LeafResourceManager + GlobalLeafClientManager leafUtils.LeafClientResourceManager } func (c *OnewayPVController) SetupWithManager(mgr manager.Manager) error { @@ -120,34 +122,39 @@ func (c *OnewayPVController) Reconcile(ctx context.Context, request reconcile.Re return reconcile.Result{RequeueAfter: requeueTime}, nil } + lcr, err := c.leafClientResource(leaf) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", leaf.Cluster.Name) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + if pvErr != nil && errors.IsNotFound(pvErr) || !pv.DeletionTimestamp.IsZero() { - return c.clearLeafPV(ctx, leaf, pv) + return c.clearLeafPV(ctx, leaf, lcr, pv) } - return c.ensureLeafPV(ctx, leaf, pv) + return c.ensureLeafPV(ctx, leaf, lcr, pv) } -func (c *OnewayPVController) clearLeafPV(ctx context.Context, leaf *leafUtils.LeafResource, pv *corev1.PersistentVolume) (reconcile.Result, error) { - err := leaf.Clientset.CoreV1().PersistentVolumes().Delete(ctx, pv.Name, metav1.DeleteOptions{}) +func (c *OnewayPVController) clearLeafPV(ctx context.Context, leaf *leafUtils.LeafResource, leafClient *leafUtils.LeafClientResource, pv *corev1.PersistentVolume) (reconcile.Result, error) { + err := leafClient.Clientset.CoreV1().PersistentVolumes().Delete(ctx, pv.Name, metav1.DeleteOptions{}) if err != nil && !errors.IsNotFound(err) { - klog.Errorf("delete pv %s in %s cluster failed, error: %v", pv.Name, leaf.ClusterName, err) + klog.Errorf("delete pv %s in %s cluster failed, error: %v", pv.Name, leaf.Cluster.Name, err) return reconcile.Result{RequeueAfter: requeueTime}, nil } return reconcile.Result{}, nil } -func (c *OnewayPVController) ensureLeafPV(ctx context.Context, leaf *leafUtils.LeafResource, pv *corev1.PersistentVolume) (reconcile.Result, error) { - clusterName := leaf.ClusterName +func (c *OnewayPVController) ensureLeafPV(ctx context.Context, leaf *leafUtils.LeafResource, leafClient *leafUtils.LeafClientResource, pv *corev1.PersistentVolume) (reconcile.Result, error) { + clusterName := leaf.Cluster.Name newPV := pv.DeepCopy() - pvc := &corev1.PersistentVolumeClaim{} - err := leaf.Client.Get(ctx, types.NamespacedName{ + err := leafClient.Client.Get(ctx, types.NamespacedName{ Namespace: newPV.Spec.ClaimRef.Namespace, Name: newPV.Spec.ClaimRef.Name, }, pvc) if err != nil { - klog.Errorf("get pvc from cluster %s error: %v, will requeue", leaf.ClusterName, err) + klog.Errorf("get pvc from cluster %s error: %v, will requeue", leaf.Cluster.Name, err) return reconcile.Result{RequeueAfter: quickRequeueTime}, nil } @@ -155,16 +162,16 @@ func (c *OnewayPVController) ensureLeafPV(ctx context.Context, leaf *leafUtils.L newPV.Spec.ClaimRef.UID = pvc.UID anno := newPV.GetAnnotations() - anno = utils.AddResourceClusters(anno, leaf.ClusterName) + anno = utils.AddResourceClusters(anno, leaf.Cluster.Name) anno[utils.KosmosGlobalLabel] = "true" newPV.SetAnnotations(anno) oldPV := &corev1.PersistentVolume{} - err = leaf.Client.Get(ctx, types.NamespacedName{ + err = leafClient.Client.Get(ctx, types.NamespacedName{ Name: newPV.Name, }, oldPV) if err != nil && !errors.IsNotFound(err) { - klog.Errorf("get pv from cluster %s error: %v, will requeue", leaf.ClusterName, err) + klog.Errorf("get pv from cluster %s error: %v, will requeue", leaf.Cluster.Name, err) return reconcile.Result{RequeueAfter: requeueTime}, nil } @@ -172,7 +179,7 @@ func (c *OnewayPVController) ensureLeafPV(ctx context.Context, leaf *leafUtils.L if err != nil && errors.IsNotFound(err) { newPV.UID = "" newPV.ResourceVersion = "" - if err = leaf.Client.Create(ctx, newPV); err != nil && !errors.IsAlreadyExists(err) { + if err = leafClient.Client.Create(ctx, newPV); err != nil && !errors.IsAlreadyExists(err) { klog.Errorf("create pv to cluster %s error: %v, will requeue", clusterName, err) return reconcile.Result{RequeueAfter: requeueTime}, nil } @@ -190,10 +197,19 @@ func (c *OnewayPVController) ensureLeafPV(ctx context.Context, leaf *leafUtils.L klog.Errorf("patch pv error: %v", err) return reconcile.Result{}, err } - _, err = leaf.Clientset.CoreV1().PersistentVolumes().Patch(ctx, newPV.Name, types.MergePatchType, patch, metav1.PatchOptions{}) + _, err = leafClient.Clientset.CoreV1().PersistentVolumes().Patch(ctx, newPV.Name, types.MergePatchType, patch, metav1.PatchOptions{}) if err != nil { - klog.Errorf("patch pv %s to %s cluster failed, error: %v", newPV.Name, leaf.ClusterName, err) + klog.Errorf("patch pv %s to %s cluster failed, error: %v", newPV.Name, leaf.Cluster.Name, err) return reconcile.Result{RequeueAfter: requeueTime}, nil } return reconcile.Result{}, nil } + +func (c *OnewayPVController) leafClientResource(lr *leafUtils.LeafResource) (*leafUtils.LeafClientResource, error) { + actualClusterName := leafUtils.GetActualClusterName(lr.Cluster) + lcr, err := c.GlobalLeafClientManager.GetLeafResource(actualClusterName) + if err != nil { + return nil, fmt.Errorf("get leaf client resource err: %v", err) + } + return lcr, nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go b/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go index 4c64f494e..eac67f020 100644 --- a/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pv/root_pv_controller.go @@ -2,6 +2,7 @@ package pv import ( "context" + "fmt" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -26,8 +27,9 @@ const ( ) type RootPVController struct { - RootClient client.Client - GlobalLeafManager leafUtils.LeafResourceManager + RootClient client.Client + GlobalLeafManager leafUtils.LeafResourceManager + GlobalLeafClientManager leafUtils.LeafClientResourceManager } func (r *RootPVController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { @@ -86,7 +88,13 @@ func (r *RootPVController) SetupWithManager(mgr manager.Manager) error { return false } - if err = lr.Clientset.CoreV1().PersistentVolumes().Delete(context.TODO(), deleteEvent.Object.GetName(), + lcr, err := r.leafClientResource(lr) + if err != nil { + klog.Warningf("get leaf client resource failed, %v", err) + return false + } + + if err = lcr.Clientset.CoreV1().PersistentVolumes().Delete(context.TODO(), deleteEvent.Object.GetName(), metav1.DeleteOptions{}); err != nil { if !errors.IsNotFound(err) { klog.Errorf("delete pv from leaf cluster failed, %q, error: %v", deleteEvent.Object.GetName(), err) @@ -115,7 +123,13 @@ func (r *RootPVController) cleanupPv(pv *v1.PersistentVolume) (reconcile.Result, return reconcile.Result{}, nil } - if err = lr.Clientset.CoreV1().PersistentVolumes().Delete(context.TODO(), pv.GetName(), + lcr, err := r.leafClientResource(lr) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", lr.Cluster.Name) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + + if err = lcr.Clientset.CoreV1().PersistentVolumes().Delete(context.TODO(), pv.GetName(), metav1.DeleteOptions{}); err != nil { if !errors.IsNotFound(err) { klog.Errorf("delete pv from leaf cluster failed, %q, error: %v", pv.GetName(), err) @@ -124,3 +138,12 @@ func (r *RootPVController) cleanupPv(pv *v1.PersistentVolume) (reconcile.Result, } return reconcile.Result{}, nil } + +func (r *RootPVController) leafClientResource(lr *leafUtils.LeafResource) (*leafUtils.LeafClientResource, error) { + actualClusterName := leafUtils.GetActualClusterName(lr.Cluster) + lcr, err := r.GlobalLeafClientManager.GetLeafResource(actualClusterName) + if err != nil { + return nil, fmt.Errorf("get leaf client resource err: %v", err) + } + return lcr, nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/pvc/oneway_pvc_controller.go b/pkg/clustertree/cluster-manager/controllers/pvc/oneway_pvc_controller.go index ce75e8613..519903ba2 100644 --- a/pkg/clustertree/cluster-manager/controllers/pvc/oneway_pvc_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pvc/oneway_pvc_controller.go @@ -2,6 +2,7 @@ package pvc import ( "context" + "fmt" "time" corev1 "k8s.io/api/core/v1" @@ -31,9 +32,10 @@ const ( ) type OnewayPVCController struct { - Root client.Client - RootDynamic dynamic.Interface - GlobalLeafManager leafUtils.LeafResourceManager + Root client.Client + RootDynamic dynamic.Interface + GlobalLeafManager leafUtils.LeafResourceManager + GlobalLeafClientManager leafUtils.LeafClientResourceManager } func (c *OnewayPVCController) SetupWithManager(mgr manager.Manager) error { @@ -118,34 +120,40 @@ func (c *OnewayPVCController) Reconcile(ctx context.Context, request reconcile.R return reconcile.Result{RequeueAfter: requeueTime}, nil } + lcr, err := c.leafClientResource(leaf) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", leaf.Cluster.Name) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + if pvcErr != nil && errors.IsNotFound(pvcErr) || !rootPVC.DeletionTimestamp.IsZero() { - return c.clearLeafPVC(ctx, leaf, rootPVC) + return c.clearLeafPVC(ctx, leaf, lcr, rootPVC) } - return c.ensureLeafPVC(ctx, leaf, rootPVC) + return c.ensureLeafPVC(ctx, leaf, lcr, rootPVC) } -func (c *OnewayPVCController) clearLeafPVC(ctx context.Context, leaf *leafUtils.LeafResource, pvc *corev1.PersistentVolumeClaim) (reconcile.Result, error) { +func (c *OnewayPVCController) clearLeafPVC(ctx context.Context, leaf *leafUtils.LeafResource, leafClient *leafUtils.LeafClientResource, pvc *corev1.PersistentVolumeClaim) (reconcile.Result, error) { return reconcile.Result{}, nil } -func (c *OnewayPVCController) ensureLeafPVC(ctx context.Context, leaf *leafUtils.LeafResource, pvc *corev1.PersistentVolumeClaim) (reconcile.Result, error) { - clusterName := leaf.ClusterName +func (c *OnewayPVCController) ensureLeafPVC(ctx context.Context, leaf *leafUtils.LeafResource, leafClient *leafUtils.LeafClientResource, pvc *corev1.PersistentVolumeClaim) (reconcile.Result, error) { + clusterName := leaf.Cluster.Name newPVC := pvc.DeepCopy() anno := newPVC.GetAnnotations() - anno = utils.AddResourceClusters(anno, leaf.ClusterName) + anno = utils.AddResourceClusters(anno, leaf.Cluster.Name) anno[utils.KosmosGlobalLabel] = "true" newPVC.SetAnnotations(anno) oldPVC := &corev1.PersistentVolumeClaim{} - err := leaf.Client.Get(ctx, types.NamespacedName{ + err := leafClient.Client.Get(ctx, types.NamespacedName{ Name: newPVC.Name, Namespace: newPVC.Namespace, }, oldPVC) if err != nil && !errors.IsNotFound(err) { - klog.Errorf("get pvc from cluster %s error: %v, will requeue", leaf.ClusterName, err) + klog.Errorf("get pvc from cluster %s error: %v, will requeue", leaf.Cluster.Name, err) return reconcile.Result{RequeueAfter: requeueTime}, nil } @@ -153,7 +161,7 @@ func (c *OnewayPVCController) ensureLeafPVC(ctx context.Context, leaf *leafUtils if err != nil && errors.IsNotFound(err) { newPVC.UID = "" newPVC.ResourceVersion = "" - if err = leaf.Client.Create(ctx, newPVC); err != nil && !errors.IsAlreadyExists(err) { + if err = leafClient.Client.Create(ctx, newPVC); err != nil && !errors.IsAlreadyExists(err) { klog.Errorf("create pv to cluster %s error: %v, will requeue", clusterName, err) return reconcile.Result{RequeueAfter: requeueTime}, nil } @@ -171,10 +179,19 @@ func (c *OnewayPVCController) ensureLeafPVC(ctx context.Context, leaf *leafUtils klog.Errorf("patch pv error: %v", err) return reconcile.Result{}, err } - _, err = leaf.Clientset.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Patch(ctx, newPVC.Name, types.MergePatchType, patch, metav1.PatchOptions{}) + _, err = leafClient.Clientset.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Patch(ctx, newPVC.Name, types.MergePatchType, patch, metav1.PatchOptions{}) if err != nil { - klog.Errorf("patch pvc %s to %s cluster failed, error: %v", newPVC.Name, leaf.ClusterName, err) + klog.Errorf("patch pvc %s to %s cluster failed, error: %v", newPVC.Name, leaf.Cluster.Name, err) return reconcile.Result{RequeueAfter: requeueTime}, nil } return reconcile.Result{}, nil } + +func (c *OnewayPVCController) leafClientResource(lr *leafUtils.LeafResource) (*leafUtils.LeafClientResource, error) { + actualClusterName := leafUtils.GetActualClusterName(lr.Cluster) + lcr, err := c.GlobalLeafClientManager.GetLeafResource(actualClusterName) + if err != nil { + return nil, fmt.Errorf("get leaf client resource err: %v", err) + } + return lcr, nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go b/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go index 2736ed4b9..b9d02bc04 100644 --- a/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pvc/root_pvc_controller.go @@ -2,6 +2,7 @@ package pvc import ( "context" + "fmt" "reflect" v1 "k8s.io/api/core/v1" @@ -28,8 +29,9 @@ const ( ) type RootPVCController struct { - RootClient client.Client - GlobalLeafManager leafUtils.LeafResourceManager + RootClient client.Client + GlobalLeafManager leafUtils.LeafResourceManager + GlobalLeafClientManager leafUtils.LeafClientResourceManager } func (r *RootPVCController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { @@ -62,8 +64,14 @@ func (r *RootPVCController) Reconcile(ctx context.Context, request reconcile.Req return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } + lcr, err := r.leafClientResource(lr) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", lr.Cluster.Name) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + pvcOld := &v1.PersistentVolumeClaim{} - err = lr.Client.Get(ctx, request.NamespacedName, pvcOld) + err = lcr.Client.Get(ctx, request.NamespacedName, pvcOld) if err != nil { if !errors.IsNotFound(err) { klog.Warningf("get pvc from leaf cluster failed, error: %v", err) @@ -90,7 +98,8 @@ func (r *RootPVCController) Reconcile(ctx context.Context, request reconcile.Req klog.Errorf("patch pvc error: %v", err) return reconcile.Result{}, err } - _, err = lr.Clientset.CoreV1().PersistentVolumeClaims(pvc.Namespace).Patch(ctx, + + _, err = lcr.Clientset.CoreV1().PersistentVolumeClaims(pvc.Namespace).Patch(ctx, pvc.Name, mergetypes.MergePatchType, patch, metav1.PatchOptions{}) if err != nil && !errors.IsNotFound(err) { klog.Errorf("patch pvc namespace: %q, name: %q from root cluster failed, error: %v", @@ -137,7 +146,13 @@ func (r *RootPVCController) cleanupPvc(pvc *v1.PersistentVolumeClaim) (reconcile return reconcile.Result{}, nil } - if err = lr.Clientset.CoreV1().PersistentVolumeClaims(pvc.GetNamespace()).Delete(context.TODO(), pvc.GetName(), metav1.DeleteOptions{}); err != nil { + lcr, err := r.leafClientResource(lr) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", lr.Cluster.Name) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + + if err = lcr.Clientset.CoreV1().PersistentVolumeClaims(pvc.GetNamespace()).Delete(context.TODO(), pvc.GetName(), metav1.DeleteOptions{}); err != nil { if !errors.IsNotFound(err) { klog.Errorf("delete pvc from leaf cluster failed, %q: %q, error: %v", pvc.GetNamespace(), pvc.GetName(), err) return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, err @@ -145,3 +160,12 @@ func (r *RootPVCController) cleanupPvc(pvc *v1.PersistentVolumeClaim) (reconcile } return reconcile.Result{}, nil } + +func (r *RootPVCController) leafClientResource(lr *leafUtils.LeafResource) (*leafUtils.LeafClientResource, error) { + actualClusterName := leafUtils.GetActualClusterName(lr.Cluster) + lcr, err := r.GlobalLeafClientManager.GetLeafResource(actualClusterName) + if err != nil { + return nil, fmt.Errorf("get leaf client resource err: %v", err) + } + return lcr, nil +} diff --git a/pkg/clustertree/cluster-manager/node-server/server.go b/pkg/clustertree/cluster-manager/node-server/server.go index 928dbe5f9..fc56b590e 100644 --- a/pkg/clustertree/cluster-manager/node-server/server.go +++ b/pkg/clustertree/cluster-manager/node-server/server.go @@ -39,8 +39,9 @@ func DefaultServerCiphers() []uint16 { } type NodeServer struct { - RootClient client.Client - GlobalLeafManager leafUtils.LeafResourceManager + RootClient client.Client + GlobalLeafManager leafUtils.LeafResourceManager + GlobalLeafClientManager leafUtils.LeafClientResourceManager } type HttpConfig struct { @@ -49,25 +50,31 @@ type HttpConfig struct { tlsConfig *tls.Config } -func (n *NodeServer) getClient(ctx context.Context, namespace string, podName string) (kubernetes.Interface, *rest.Config, error) { +func (s *NodeServer) getClient(ctx context.Context, namespace string, podName string) (kubernetes.Interface, *rest.Config, error) { nsname := types.NamespacedName{ Namespace: namespace, Name: podName, } rootPod := &corev1.Pod{} - if err := n.RootClient.Get(ctx, nsname, rootPod); err != nil { + if err := s.RootClient.Get(ctx, nsname, rootPod); err != nil { return nil, nil, err } nodeName := rootPod.Spec.NodeName - lr, err := n.GlobalLeafManager.GetLeafResourceByNodeName(nodeName) + lr, err := s.GlobalLeafManager.GetLeafResourceByNodeName(nodeName) if err != nil { return nil, nil, err } - return lr.Clientset, lr.RestConfig, nil + actualClusterName := leafUtils.GetActualClusterName(lr.Cluster) + lcr, err := s.GlobalLeafClientManager.GetLeafResource(actualClusterName) + if err != nil { + return nil, nil, fmt.Errorf("get leaf client resource err: %v", err) + } + + return lcr.Clientset, lcr.RestConfig, nil } func (s *NodeServer) RunHTTP(ctx context.Context, httpConfig HttpConfig) (func(), error) { diff --git a/pkg/clustertree/cluster-manager/utils/leaf_client_resource_manager.go b/pkg/clustertree/cluster-manager/utils/leaf_client_resource_manager.go new file mode 100644 index 000000000..3fe5cdc09 --- /dev/null +++ b/pkg/clustertree/cluster-manager/utils/leaf_client_resource_manager.go @@ -0,0 +1,92 @@ +package utils + +import ( + "fmt" + "sync" + + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + + kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + kosmosversioned "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" + "github.com/kosmos.io/kosmos/pkg/utils" +) + +var ( + clientInstance LeafClientResourceManager + clientOnce sync.Once +) + +type LeafClientResource struct { + Client client.Client + DynamicClient dynamic.Interface + Clientset kubernetes.Interface + KosmosClient kosmosversioned.Interface + RestConfig *rest.Config +} + +type LeafClientResourceManager interface { + AddLeafClientResource(lcr *LeafClientResource, cluster *kosmosv1alpha1.Cluster) + + RemoveLeafClientResource(actualClusterName string) + + GetLeafResource(actualClusterName string) (*LeafClientResource, error) +} + +type leafClientResourceManager struct { + clientResourceMap map[string]*LeafClientResource + leafClientResourceManagersLock sync.Mutex +} + +func (cr *leafClientResourceManager) GetLeafResource(actualClusterName string) (*LeafClientResource, error) { + cr.leafClientResourceManagersLock.Lock() + defer cr.leafClientResourceManagersLock.Unlock() + if m, ok := cr.clientResourceMap[actualClusterName]; ok { + return m, nil + } else { + return nil, fmt.Errorf("cannot get leaf client resource, actualClusterName: %s", actualClusterName) + } +} + +func (cr *leafClientResourceManager) AddLeafClientResource(lcr *LeafClientResource, cluster *kosmosv1alpha1.Cluster) { + cr.leafClientResourceManagersLock.Lock() + defer cr.leafClientResourceManagersLock.Unlock() + + actualClusterName := GetActualClusterName(cluster) + + // Only adds or updates the lcr in clientResourceMap if actualClusterName does not exist. + // This prevents updating the map if an entry for actualClusterName already exists. + if _, exists := cr.clientResourceMap[actualClusterName]; !exists { + cr.clientResourceMap[actualClusterName] = lcr + } +} + +func (cr *leafClientResourceManager) RemoveLeafClientResource(actualClusterName string) { + cr.leafClientResourceManagersLock.Lock() + defer cr.leafClientResourceManagersLock.Unlock() + delete(cr.clientResourceMap, actualClusterName) +} + +func GetGlobalLeafClientResourceManager() LeafClientResourceManager { + clientOnce.Do(func() { + clientInstance = &leafClientResourceManager{ + clientResourceMap: make(map[string]*LeafClientResource), + } + }) + + return clientInstance +} + +// GetActualClusterName extracts the actualClusterName from the cluster labels, or use the cluster's name if the specific label is not present. +func GetActualClusterName(cluster *kosmosv1alpha1.Cluster) string { + var actualClusterName string + labels := cluster.Labels + if actualName, ok := labels[utils.KosmosActualClusterName]; ok { + actualClusterName = actualName + } else { + actualClusterName = cluster.Name + } + return actualClusterName +} diff --git a/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go b/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go index 19188f97d..3a3475116 100644 --- a/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go +++ b/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go @@ -6,13 +6,8 @@ import ( "sync" corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/client" kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" - kosmosversioned "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/utils" ) @@ -37,20 +32,15 @@ type ClusterNode struct { } type LeafResource struct { - Client client.Client - DynamicClient dynamic.Interface - Clientset kubernetes.Interface - KosmosClient kosmosversioned.Interface - ClusterName string + Cluster *kosmosv1alpha1.Cluster Namespace string IgnoreLabels []string EnableServiceAccount bool Nodes []ClusterNode - RestConfig *rest.Config } type LeafResourceManager interface { - AddLeafResource(lr *LeafResource, cluster *kosmosv1alpha1.Cluster, node []*corev1.Node) + AddLeafResource(lr *LeafResource, node []*corev1.Node) RemoveLeafResource(clusterName string) // get leafresource by cluster name GetLeafResource(clusterName string) (*LeafResource, error) @@ -95,13 +85,13 @@ func getClusterNode(clusternodes []ClusterNode, target string) *ClusterNode { return nil } -func (l *leafResourceManager) AddLeafResource(lptr *LeafResource, cluster *kosmosv1alpha1.Cluster, nodes []*corev1.Node) { +func (l *leafResourceManager) AddLeafResource(lptr *LeafResource, nodes []*corev1.Node) { l.leafResourceManagersLock.Lock() defer l.leafResourceManagersLock.Unlock() - clusterName := cluster.Name + clusterName := lptr.Cluster.Name - leafModels := cluster.Spec.ClusterTreeOptions.LeafModels + leafModels := lptr.Cluster.Spec.ClusterTreeOptions.LeafModels clusterNodes := []ClusterNode{} for i, n := range nodes { diff --git a/pkg/utils/constants.go b/pkg/utils/constants.go index 2c676d53a..eb2ebad83 100644 --- a/pkg/utils/constants.go +++ b/pkg/utils/constants.go @@ -99,22 +99,23 @@ const ( // cluster node const ( - KosmosNodePrefix = "kosmos-" - KosmosNodeLabel = "kosmos.io/node" - KosmosNodeValue = "true" - KosmosNodeJoinLabel = "kosmos.io/join" - KosmosNodeJoinValue = "true" - KosmosNodeTaintKey = "kosmos.io/node" - KosmosNodeTaintValue = "true" - KosmosNodeTaintEffect = "NoSchedule" - KosmosPodLabel = "kosmos-io/pod" - KosmosGlobalLabel = "kosmos.io/global" - KosmosSelectorKey = "kosmos.io/cluster-selector" - KosmosTrippedLabels = "kosmos-io/tripped" - KosmosConvertLabels = "kosmos-io/convert-policy" - KosmosPvcLabelSelector = "kosmos-io/label-selector" - KosmosExcludeNodeLabel = "kosmos.io/exclude" - KosmosExcludeNodeValue = "true" + KosmosNodePrefix = "kosmos-" + KosmosNodeLabel = "kosmos.io/node" + KosmosActualClusterName = "kosmos.io/actual-cluster-name" + KosmosNodeValue = "true" + KosmosNodeJoinLabel = "kosmos.io/join" + KosmosNodeJoinValue = "true" + KosmosNodeTaintKey = "kosmos.io/node" + KosmosNodeTaintValue = "true" + KosmosNodeTaintEffect = "NoSchedule" + KosmosPodLabel = "kosmos-io/pod" + KosmosGlobalLabel = "kosmos.io/global" + KosmosSelectorKey = "kosmos.io/cluster-selector" + KosmosTrippedLabels = "kosmos-io/tripped" + KosmosConvertLabels = "kosmos-io/convert-policy" + KosmosPvcLabelSelector = "kosmos-io/label-selector" + KosmosExcludeNodeLabel = "kosmos.io/exclude" + KosmosExcludeNodeValue = "true" // on resorce (pv, configmap, secret), represents which cluster this resource belongs to KosmosResourceOwnersAnnotations = "kosmos-io/cluster-owners"