From 78067fe0d0770e831c326112fe58e6164c25a650 Mon Sep 17 00:00:00 2001 From: GreatLazyMan Date: Wed, 20 Mar 2024 19:25:02 +0800 Subject: [PATCH] title: virtualcluster join controller could delete useless node when update virtualclustre Signed-off-by: GreatLazyMan --- .../virtualcluster_join_controller.go | 176 ++++++++++++------ 1 file changed, 118 insertions(+), 58 deletions(-) diff --git a/pkg/treeoperator/controller/virtualcluster_join_controller.go b/pkg/treeoperator/controller/virtualcluster_join_controller.go index 0965a1fbb..bf1050a75 100644 --- a/pkg/treeoperator/controller/virtualcluster_join_controller.go +++ b/pkg/treeoperator/controller/virtualcluster_join_controller.go @@ -299,26 +299,133 @@ func (c *VirtualClusterJoinController) DeployKosmos(ctx context.Context, request return nil } -func (c *VirtualClusterJoinController) ClearSomeNodeOwner(nodeNames []string) { +func (c *VirtualClusterJoinController) ClearSomeNodeOwner(nodeNames *[]string) { if !c.AllowNodeOwnbyMulticluster { mu.Lock() - for _, nodeName := range nodeNames { + for _, nodeName := range *nodeNames { nodeOwnerMap[nodeName] = "" } mu.Unlock() } } +func (c *VirtualClusterJoinController) CreateClusterObject(ctx context.Context, request reconcile.Request, + vc *v1alpha1.VirtualCluster, hostK8sClient kubernetes.Interface, cluster *v1alpha1.Cluster) (*[]string, *map[string]struct{}, error) { + var leafModels []v1alpha1.LeafModel + // recored new nodes' name, if error happen before create or update, need clear newNodeNames + newNodeNames := []string{} + // record all nodes' name in a map, when update cr, may need to delete some old node + // compare all nodes in cluster cr to all node exits in virtual cluster,we can find which ndoe should be deleted + allNodeNamesMap := map[string]struct{}{} + + for _, nodeName := range vc.Spec.PromoteResources.Nodes { + _, err := hostK8sClient.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + klog.Warningf("node %s doesn't exits: %v", nodeName, err) + continue + } + c.ClearSomeNodeOwner(&newNodeNames) + klog.Errorf("get node %s error: %v", nodeName, err) + return nil, nil, err + } + if !c.AllowNodeOwnbyMulticluster { + mu.Lock() + if len(nodeOwnerMap) > 0 { + if nodeOwner, existed := nodeOwnerMap[nodeName]; existed && len(nodeOwner) > 0 { + if nodeOwner != cluster.Name { + continue + } + } else { + newNodeNames = append(newNodeNames, nodeName) + } + } else { + newNodeNames = append(newNodeNames, nodeName) + } + allNodeNamesMap[nodeName] = struct{}{} + nodeOwnerMap[nodeName] = cluster.Name + mu.Unlock() + } + leafModel := v1alpha1.LeafModel{ + LeafNodeName: nodeName, + Taints: []corev1.Taint{ + { + Effect: utils.KosmosNodeTaintEffect, + Key: utils.KosmosNodeTaintKey, + Value: utils.KosmosNodeValue, + }, + }, + NodeSelector: v1alpha1.NodeSelector{ + NodeName: nodeName, + }, + } + leafModels = append(leafModels, leafModel) + } + klog.V(7).Infof("all new node in cluster %s: %v", cluster.Name, newNodeNames) + klog.V(7).Infof("all node in cluster %s: %v", cluster.Name, allNodeNamesMap) + cluster.Spec.ClusterTreeOptions.LeafModels = leafModels + + return &newNodeNames, &allNodeNamesMap, nil +} + +func (c *VirtualClusterJoinController) CreateOrUpdateCluster(ctx context.Context, request reconcile.Request, + kosmosClient versioned.Interface, k8sClient kubernetes.Interface, newNodeNames *[]string, + allNodeNamesMap *map[string]struct{}, cluster *v1alpha1.Cluster) error { + old, err := kosmosClient.KosmosV1alpha1().Clusters().Get(context.TODO(), cluster.Name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + _, err = kosmosClient.KosmosV1alpha1().Clusters().Create(context.TODO(), cluster, metav1.CreateOptions{}) + if err != nil { + c.ClearSomeNodeOwner(newNodeNames) + return fmt.Errorf("create cluster %s failed: %v", cluster.Name, err) + } + } else { + c.ClearSomeNodeOwner(newNodeNames) + return fmt.Errorf("create cluster %s failed when get it first: %v", cluster.Name, err) + } + klog.Infof("Cluster %s for %s/%s has been created.", cluster.Name, request.Namespace, request.Name) + } else { + cluster.ResourceVersion = old.GetResourceVersion() + _, err = kosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{}) + if err != nil { + c.ClearSomeNodeOwner(newNodeNames) + return fmt.Errorf("update cluster %s failed: %v", cluster.Name, err) + } + + k8sNodesList, err := k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("list %s's k8s nodes error: %v", cluster.Name, err) + } + // clear node, delete some node not in new VirtualCluster.spec.PromoteResources.Nodes + for _, node := range k8sNodesList.Items { + if _, ok := (*allNodeNamesMap)[node.Name]; !ok { + // if existed node not in map, it should be deleted + err := k8sClient.CoreV1().Nodes().Delete(context.TODO(), node.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("delete %s's k8s nodes error: %v", cluster.Name, err) + } + // clear ndoe's owner + mu.Lock() + nodeOwnerMap[node.Name] = "" + mu.Unlock() + } + } + klog.Infof("Cluster %s for %s/%s has been updated.", cluster.Name, request.Namespace, request.Name) + } + return nil +} + func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error { kubeconfigStream, err := base64.StdEncoding.DecodeString(vc.Spec.Kubeconfig) if err != nil { return fmt.Errorf("decode target kubernetes kubeconfig %s err: %v", vc.Spec.Kubeconfig, err) } - kosmosClient, _, k8sExtensionsClient, err := c.InitTargetKubeclient(kubeconfigStream) + kosmosClient, k8sClient, k8sExtensionsClient, err := c.InitTargetKubeclient(kubeconfigStream) if err != nil { return fmt.Errorf("crd kubernetes client failed: %v", err) } + // create crd cluster.kosmos.io klog.Infof("Attempting to create kosmos-clustertree CRDs for virtualcluster %s/%s...", request.Namespace, request.Name) for _, crdToCreate := range []string{manifest.ServiceImport, manifest.Cluster, manifest.ServiceExport} { crdObject, err := kosmosctl.GenerateCustomResourceDefinition(crdToCreate, nil) @@ -337,6 +444,7 @@ func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, reques } } + // construct cluster.kosmos.io cr clusterName := fmt.Sprintf("virtualcluster-%s-%s", request.Namespace, request.Name) klog.Infof("Attempting to create cluster %s for %s/%s ...", clusterName, request.Namespace, request.Name) @@ -362,66 +470,18 @@ func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, reques if err != nil { return fmt.Errorf("crd kubernetes client failed: %v", err) } - var leafModels []v1alpha1.LeafModel - newNodeNames := []string{} - for _, nodeName := range vc.Spec.PromoteResources.Nodes { - _, err := hostK8sClient.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) - if err != nil { - klog.Errorf("node %s doesn't exits: %v", nodeName, err) - continue - } - if !c.AllowNodeOwnbyMulticluster { - mu.Lock() - if len(nodeOwnerMap) > 0 { - if nodeOwner, existed := nodeOwnerMap[nodeName]; existed && len(nodeOwner) > 0 && - nodeOwner != clusterName { - continue - } - } - nodeOwnerMap[nodeName] = clusterName - newNodeNames = append(newNodeNames, nodeName) - mu.Unlock() - } - leafModel := v1alpha1.LeafModel{ - LeafNodeName: nodeName, - Taints: []corev1.Taint{ - { - Effect: utils.KosmosNodeTaintEffect, - Key: utils.KosmosNodeTaintKey, - Value: utils.KosmosNodeValue, - }, - }, - NodeSelector: v1alpha1.NodeSelector{ - NodeName: nodeName, - }, - } - leafModels = append(leafModels, leafModel) + newNodeNames, allNodeNamesMap, nil := c.CreateClusterObject(ctx, request, vc, hostK8sClient, &cluster) + if err != nil { + return err } - cluster.Spec.ClusterTreeOptions.LeafModels = leafModels - old, err := kosmosClient.KosmosV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{}) + // use client-go to create or update cluster.kosmos.io cr + err = c.CreateOrUpdateCluster(ctx, request, kosmosClient, k8sClient, newNodeNames, allNodeNamesMap, &cluster) if err != nil { - if apierrors.IsNotFound(err) { - _, err = kosmosClient.KosmosV1alpha1().Clusters().Create(context.TODO(), &cluster, metav1.CreateOptions{}) - if err != nil { - c.ClearSomeNodeOwner(newNodeNames) - return fmt.Errorf("create cluster %s failed: %v", clusterName, err) - } - } else { - c.ClearSomeNodeOwner(newNodeNames) - return fmt.Errorf("create cluster %s failed when get it first: %v", clusterName, err) - } - klog.Infof("Cluster %s for %s/%s has been created.", clusterName, request.Namespace, request.Name) - } else { - cluster.ResourceVersion = old.GetResourceVersion() - _, err := kosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), &cluster, metav1.UpdateOptions{}) - if err != nil { - c.ClearSomeNodeOwner(newNodeNames) - return fmt.Errorf("update cluster %s failed: %v", clusterName, err) - } - klog.Infof("Cluster %s for %s/%s has been updated.", clusterName, request.Namespace, request.Name) + return err } + return nil }