From 4c3010d3d4adce56023a10f21369fde778692888 Mon Sep 17 00:00:00 2001 From: GreatLazyMan Date: Fri, 15 Mar 2024 11:18:29 +0800 Subject: [PATCH] title: virtualcluster join controller create mcs crd Signed-off-by: GreatLazyMan --- .../app/options/options.go | 8 +- cmd/clustertree-operator/app/tree_operator.go | 8 +- .../virtualcluster_join_controller.go | 177 +++++++++++------- 3 files changed, 117 insertions(+), 76 deletions(-) diff --git a/cmd/clustertree-operator/app/options/options.go b/cmd/clustertree-operator/app/options/options.go index d7b9a132d..c60116264 100644 --- a/cmd/clustertree-operator/app/options/options.go +++ b/cmd/clustertree-operator/app/options/options.go @@ -9,9 +9,9 @@ import ( ) type Options struct { - LeaderElection componentbaseconfig.LeaderElectionConfiguration - KubernetesOptions KubernetesOptions - NodeReuse bool + LeaderElection componentbaseconfig.LeaderElectionConfiguration + KubernetesOptions KubernetesOptions + AllowNodeOwnbyMulticluster bool } type KubernetesOptions struct { @@ -44,5 +44,5 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.IntVar(&o.KubernetesOptions.Burst, "kube-burst", 60, "Burst to use while talking with kube-apiserver.") flags.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path for kubernetes kubeconfig file, if left blank, will use in cluster way.") flags.StringVar(&o.KubernetesOptions.Master, "master", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.") - flags.BoolVar(&o.NodeReuse, "node-reuse", false, "different virtual cluster can reuse same node.") + flags.BoolVar(&o.AllowNodeOwnbyMulticluster, "multiowner", false, "Allow node own by multicluster or not.") } diff --git a/cmd/clustertree-operator/app/tree_operator.go b/cmd/clustertree-operator/app/tree_operator.go index 30d11672f..4311ceec7 100644 --- a/cmd/clustertree-operator/app/tree_operator.go +++ b/cmd/clustertree-operator/app/tree_operator.go @@ -83,10 +83,10 @@ func run(ctx context.Context, opts *options.Options) error { } VirtualClusterJoinController := controller.VirtualClusterJoinController{ - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(constants.JoinControllerName), - KubeconfigPath: opts.KubernetesOptions.KubeConfig, - NodeReuse: opts.NodeReuse, + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor(constants.JoinControllerName), + KubeconfigPath: opts.KubernetesOptions.KubeConfig, + AllowNodeOwnbyMulticluster: opts.AllowNodeOwnbyMulticluster, } if err = VirtualClusterJoinController.SetupWithManager(mgr); err != nil { return fmt.Errorf("error starting %s: %v", constants.JoinControllerName, err) diff --git a/pkg/treeoperator/controller/virtualcluster_join_controller.go b/pkg/treeoperator/controller/virtualcluster_join_controller.go index 5ce5b36fe..0965a1fbb 100644 --- a/pkg/treeoperator/controller/virtualcluster_join_controller.go +++ b/pkg/treeoperator/controller/virtualcluster_join_controller.go @@ -10,7 +10,6 @@ import ( corev1 "k8s.io/api/core/v1" extensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -32,7 +31,6 @@ import ( clusterManager "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kosmosctl/manifest" - "github.com/kosmos.io/kosmos/pkg/kosmosctl/util" kosmosctl "github.com/kosmos.io/kosmos/pkg/kosmosctl/util" "github.com/kosmos.io/kosmos/pkg/treeoperator/constants" treemanifest "github.com/kosmos.io/kosmos/pkg/treeoperator/manifest" @@ -42,14 +40,15 @@ import ( type VirtualClusterJoinController struct { client.Client - EventRecorder record.EventRecorder - KubeconfigPath string - KubeconfigStream []byte - NodeReuse bool + EventRecorder record.EventRecorder + KubeconfigPath string + KubeconfigStream []byte + AllowNodeOwnbyMulticluster bool } var nodeOwnerMap map[string]string = make(map[string]string) var mu sync.Mutex +var once sync.Once func (c *VirtualClusterJoinController) RemoveClusterFinalizer(cluster *v1alpha1.Cluster, kosmosClient versioned.Interface) error { for _, finalizer := range []string{utils.ClusterStartControllerFinalizer, clusterManager.ControllerFinalizerName} { @@ -68,6 +67,57 @@ func (c *VirtualClusterJoinController) RemoveClusterFinalizer(cluster *v1alpha1. return nil } +func (c *VirtualClusterJoinController) InitNodeOwnerMap() { + vcList := &v1alpha1.VirtualClusterList{} + err := c.List(context.Background(), vcList) + if err != nil { + klog.Errorf("list virtual cluster error: %v", err) + return + } + for _, vc := range vcList.Items { + if vc.Status.Phase == constants.VirtualClusterStatusCompleted { + kubeconfigStream, err := base64.StdEncoding.DecodeString(vc.Spec.Kubeconfig) + if err != nil { + klog.Errorf("virtualcluster %s decode target kubernetes kubeconfig %s err: %v", vc.Name, vc.Spec.Kubeconfig, err) + continue + } + kosmosClient, _, k8sExtensionsClient, err := c.InitTargetKubeclient(kubeconfigStream) + if err != nil { + klog.Errorf("virtualcluster %s crd kubernetes client failed: %v", vc.Name, err) + continue + } + _, err = k8sExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), "clusters.kosmos.io", metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + klog.Errorf("virtualcluster %s get crd clusters.kosmos.io err: %v", vc.Name, err) + } + klog.Infof("virtualcluster %s crd clusters.kosmos.io doesn't exist", vc.Name) + continue + } + clusters, err := kosmosClient.KosmosV1alpha1().Clusters().List(context.Background(), metav1.ListOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + klog.Infof("virtualcluster %s get clusters err: %v", vc.Name, err) + } + klog.Infof("virtualcluster %s cluster doesn't exist", vc.Name) + continue + } + mu.Lock() + for _, cluster := range clusters.Items { + for _, node := range cluster.Spec.ClusterTreeOptions.LeafModels { + if vcName, ok := nodeOwnerMap[node.LeafNodeName]; ok && len(vcName) > 0 { + klog.Warningf("node %s also belong to cluster %s", node.LeafNodeName, vcName) + } + nodeOwnerMap[node.LeafNodeName] = vc.Name + } + } + mu.Unlock() + } + klog.Infof("check virtualcluster %s, nodeOwnerMap is %v", vc.Name, nodeOwnerMap) + } + klog.Infof("Init nodeOwnerMap is %v", nodeOwnerMap) +} + func (c *VirtualClusterJoinController) UninstallClusterTree(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error { klog.Infof("Start deleting kosmos-clustertree deployment %s/%s-clustertree-cluster-manager...", request.Namespace, request.Name) clustertreeDeploy, err := kosmosctl.GenerateDeployment(treemanifest.ClusterTreeClusterManagerDeployment, treemanifest.DeploymentReplace{ @@ -135,7 +185,7 @@ func (c *VirtualClusterJoinController) UninstallClusterTree(ctx context.Context, if err != nil { return fmt.Errorf("decode target kubernetes kubeconfig %s err: %v", vc.Spec.Kubeconfig, err) } - err, kosmosClient, _, _ := c.InitTargetKubeclient(kubeconfigStream) + kosmosClient, _, _, err := c.InitTargetKubeclient(kubeconfigStream) if err != nil { return fmt.Errorf("create kubernetes client failed: %v", err) } @@ -147,7 +197,7 @@ func (c *VirtualClusterJoinController) UninstallClusterTree(ctx context.Context, return fmt.Errorf("get cluster %s failed when we try to del: %v", clusterName, err) } } else { - if !c.NodeReuse { + if !c.AllowNodeOwnbyMulticluster { mu.Lock() for _, nodeName := range old.Spec.ClusterTreeOptions.LeafModels { nodeOwnerMap[nodeName.LeafNodeName] = "" @@ -170,30 +220,30 @@ func (c *VirtualClusterJoinController) UninstallClusterTree(ctx context.Context, return nil } -func (c *VirtualClusterJoinController) InitTargetKubeclient(kubeconfigStream []byte) (error, versioned.Interface, kubernetes.Interface, extensionsclient.Interface) { +func (c *VirtualClusterJoinController) InitTargetKubeclient(kubeconfigStream []byte) (versioned.Interface, kubernetes.Interface, extensionsclient.Interface, error) { //targetKubeconfig := path.Join(DefaultKubeconfigPath, "kubeconfig") //config, err := utils.RestConfig(targetKubeconfig, "") config, err := utils.NewConfigFromBytes(kubeconfigStream) if err != nil { - return fmt.Errorf("generate kubernetes config failed: %s", err), nil, nil, nil + return nil, nil, nil, fmt.Errorf("generate kubernetes config failed: %s", err) } kosmosClient, err := versioned.NewForConfig(config) if err != nil { - return fmt.Errorf("generate Kosmos client failed: %v", err), nil, nil, nil + return nil, nil, nil, fmt.Errorf("generate Kosmos client failed: %v", err) } k8sClient, err := kubernetes.NewForConfig(config) if err != nil { - return fmt.Errorf("generate K8s basic client failed: %v", err), nil, nil, nil + return nil, nil, nil, fmt.Errorf("generate K8s basic client failed: %v", err) } k8sExtensionsClient, err := extensionsclient.NewForConfig(config) if err != nil { - return fmt.Errorf("generate K8s extensions client failed: %v", err), nil, nil, nil + return nil, nil, nil, fmt.Errorf("generate K8s extensions client failed: %v", err) } - return nil, kosmosClient, k8sClient, k8sExtensionsClient + return kosmosClient, k8sClient, k8sExtensionsClient, nil } func (c *VirtualClusterJoinController) DeployKosmos(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error { @@ -249,57 +299,43 @@ func (c *VirtualClusterJoinController) DeployKosmos(ctx context.Context, request return nil } -func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error { - klog.Infof("Attempting to create kosmos-clustertree CRDs for virtualcluster %s/%s...", request.Namespace, request.Name) - clustertreeCluster, err := util.GenerateCustomResourceDefinition(manifest.Cluster, nil) - if err != nil { - return err +func (c *VirtualClusterJoinController) ClearSomeNodeOwner(nodeNames []string) { + if !c.AllowNodeOwnbyMulticluster { + mu.Lock() + for _, nodeName := range nodeNames { + nodeOwnerMap[nodeName] = "" + } + mu.Unlock() } +} +func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error { kubeconfigStream, err := base64.StdEncoding.DecodeString(vc.Spec.Kubeconfig) if err != nil { return fmt.Errorf("decode target kubernetes kubeconfig %s err: %v", vc.Spec.Kubeconfig, err) } - err, kosmosClient, _, k8sExtensionsClient := c.InitTargetKubeclient(kubeconfigStream) + kosmosClient, _, k8sExtensionsClient, err := c.InitTargetKubeclient(kubeconfigStream) if err != nil { return fmt.Errorf("crd kubernetes client failed: %v", err) } - serviceExport, err := util.GenerateCustomResourceDefinition(manifest.ServiceExport, nil) - if err != nil { - return err - } - _, err = k8sExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(context.Background(), serviceExport, metav1.CreateOptions{}) - if err != nil { - if !apierrors.IsAlreadyExists(err) { - return fmt.Errorf("Create CRD %s for virtualcluster %s/%s failed: %v.", - serviceExport.Name, request.Namespace, request.Name, err) - } - } - klog.Infof("Create CRD %s for virtualcluster %s/%s successful.", serviceExport.Name, request.Namespace, request.Name) - - serviceImport, err := util.GenerateCustomResourceDefinition(manifest.ServiceImport, nil) - if err != nil { - return err - } - _, err = k8sExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(context.Background(), serviceImport, metav1.CreateOptions{}) - if err != nil { - if !apierrors.IsAlreadyExists(err) { - return fmt.Errorf("Create CRD %s for virtualcluster %s/%s failed: %v.", - serviceImport.Name, request.Namespace, request.Name, err) + klog.Infof("Attempting to create kosmos-clustertree CRDs for virtualcluster %s/%s...", request.Namespace, request.Name) + for _, crdToCreate := range []string{manifest.ServiceImport, manifest.Cluster, manifest.ServiceExport} { + crdObject, err := kosmosctl.GenerateCustomResourceDefinition(crdToCreate, nil) + if err != nil { + return err } - } - klog.Info("Create CRD %s for virtualcluster %s/%s successful.", serviceImport.Name, request.Namespace, request.Name) - - _, err = k8sExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(context.Background(), clustertreeCluster, metav1.CreateOptions{}) - if err != nil { - if apierrors.IsAlreadyExists(err) { - klog.Warningf("CRD %s is existed, creation process will skip", clustertreeCluster.Name) + _, err = k8sExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(context.Background(), crdObject, metav1.CreateOptions{}) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("create CRD %s for virtualcluster %s/%s failed: %v", + crdObject.Name, request.Namespace, request.Name, err) + } + klog.Warningf("CRD %s is existed, creation process will skip", crdObject.Name) } else { - return fmt.Errorf("CRD create failed for virtualcluster %s/%s: %v", request.Namespace, request.Name, err) + klog.Infof("Create CRD %s for virtualcluster %s/%s successful.", crdObject.Name, request.Namespace, request.Name) } } - klog.Infof("Create CRD %s for virtualcluster %s/%s successful.", clustertreeCluster.Name, request.Namespace, request.Name) clusterName := fmt.Sprintf("virtualcluster-%s-%s", request.Namespace, request.Name) klog.Infof("Attempting to create cluster %s for %s/%s ...", clusterName, request.Namespace, request.Name) @@ -327,18 +363,25 @@ func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, reques return fmt.Errorf("crd kubernetes client failed: %v", err) } var leafModels []v1alpha1.LeafModel - mu.Lock() + newNodeNames := []string{} + for _, nodeName := range vc.Spec.PromoteResources.Nodes { _, err := hostK8sClient.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) if err != nil { klog.Errorf("node %s doesn't exits: %v", nodeName, err) continue } - if !c.NodeReuse && len(nodeOwnerMap) > 0 { - if nodeOwner, existed := nodeOwnerMap[nodeName]; existed && len(nodeOwner) > 0 && - nodeOwner != clusterName { - continue + if !c.AllowNodeOwnbyMulticluster { + mu.Lock() + if len(nodeOwnerMap) > 0 { + if nodeOwner, existed := nodeOwnerMap[nodeName]; existed && len(nodeOwner) > 0 && + nodeOwner != clusterName { + continue + } } + nodeOwnerMap[nodeName] = clusterName + newNodeNames = append(newNodeNames, nodeName) + mu.Unlock() } leafModel := v1alpha1.LeafModel{ LeafNodeName: nodeName, @@ -353,12 +396,8 @@ func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, reques NodeName: nodeName, }, } - if !c.NodeReuse { - nodeOwnerMap[nodeName] = clusterName - } leafModels = append(leafModels, leafModel) } - mu.Unlock() cluster.Spec.ClusterTreeOptions.LeafModels = leafModels old, err := kosmosClient.KosmosV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{}) @@ -366,24 +405,23 @@ func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, reques if apierrors.IsNotFound(err) { _, err = kosmosClient.KosmosV1alpha1().Clusters().Create(context.TODO(), &cluster, metav1.CreateOptions{}) if err != nil { + c.ClearSomeNodeOwner(newNodeNames) return fmt.Errorf("create cluster %s failed: %v", clusterName, err) } } else { + c.ClearSomeNodeOwner(newNodeNames) return fmt.Errorf("create cluster %s failed when get it first: %v", clusterName, err) } + klog.Infof("Cluster %s for %s/%s has been created.", clusterName, request.Namespace, request.Name) } else { cluster.ResourceVersion = old.GetResourceVersion() - update, err := kosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), &cluster, metav1.UpdateOptions{}) + _, err := kosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), &cluster, metav1.UpdateOptions{}) if err != nil { + c.ClearSomeNodeOwner(newNodeNames) return fmt.Errorf("update cluster %s failed: %v", clusterName, err) - } else { - klog.Infof("Cluster %s hase been updated.", clusterName) - } - if !update.DeletionTimestamp.IsZero() { - return fmt.Errorf("cluster %s is deleteting, need requeue", clusterName) } + klog.Infof("Cluster %s for %s/%s has been updated.", clusterName, request.Namespace, request.Name) } - klog.Infof("Cluster %s for %s/%s has been created.", clusterName, request.Namespace, request.Name) return nil } @@ -430,10 +468,13 @@ func (c *VirtualClusterJoinController) InstallClusterTree(ctx context.Context, r func (c *VirtualClusterJoinController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { klog.V(4).Infof("============ %s starts to reconcile %s ============", constants.JoinControllerName, request.Name) defer klog.V(4).Infof("============ %s reconcile finish %s ============", constants.JoinControllerName, request.Name) + if !c.AllowNodeOwnbyMulticluster { + once.Do(c.InitNodeOwnerMap) + } var vc v1alpha1.VirtualCluster if err := c.Get(ctx, request.NamespacedName, &vc); err != nil { - if errors.IsNotFound(err) { + if apierrors.IsNotFound(err) { return reconcile.Result{}, nil } klog.Errorf("get %s error: %v", request.NamespacedName, err)