Skip to content

Commit

Permalink
title: virtualcluster join controller could delete useless node when …
Browse files Browse the repository at this point in the history
…update virtualclustre

Signed-off-by: GreatLazyMan <[email protected]>
  • Loading branch information
GreatLazyMan committed Mar 20, 2024
1 parent 4c3010d commit 78067fe
Showing 1 changed file with 118 additions and 58 deletions.
176 changes: 118 additions & 58 deletions pkg/treeoperator/controller/virtualcluster_join_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,26 +299,133 @@ func (c *VirtualClusterJoinController) DeployKosmos(ctx context.Context, request
return nil
}

func (c *VirtualClusterJoinController) ClearSomeNodeOwner(nodeNames []string) {
func (c *VirtualClusterJoinController) ClearSomeNodeOwner(nodeNames *[]string) {
if !c.AllowNodeOwnbyMulticluster {
mu.Lock()
for _, nodeName := range nodeNames {
for _, nodeName := range *nodeNames {
nodeOwnerMap[nodeName] = ""
}
mu.Unlock()
}
}

func (c *VirtualClusterJoinController) CreateClusterObject(ctx context.Context, request reconcile.Request,
vc *v1alpha1.VirtualCluster, hostK8sClient kubernetes.Interface, cluster *v1alpha1.Cluster) (*[]string, *map[string]struct{}, error) {
var leafModels []v1alpha1.LeafModel
// recored new nodes' name, if error happen before create or update, need clear newNodeNames
newNodeNames := []string{}
// record all nodes' name in a map, when update cr, may need to delete some old node
// compare all nodes in cluster cr to all node exits in virtual cluster,we can find which ndoe should be deleted
allNodeNamesMap := map[string]struct{}{}

for _, nodeName := range vc.Spec.PromoteResources.Nodes {
_, err := hostK8sClient.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.Warningf("node %s doesn't exits: %v", nodeName, err)
continue
}
c.ClearSomeNodeOwner(&newNodeNames)
klog.Errorf("get node %s error: %v", nodeName, err)
return nil, nil, err
}
if !c.AllowNodeOwnbyMulticluster {
mu.Lock()
if len(nodeOwnerMap) > 0 {
if nodeOwner, existed := nodeOwnerMap[nodeName]; existed && len(nodeOwner) > 0 {
if nodeOwner != cluster.Name {
continue
}
} else {
newNodeNames = append(newNodeNames, nodeName)
}
} else {
newNodeNames = append(newNodeNames, nodeName)
}
allNodeNamesMap[nodeName] = struct{}{}
nodeOwnerMap[nodeName] = cluster.Name
mu.Unlock()
}
leafModel := v1alpha1.LeafModel{
LeafNodeName: nodeName,
Taints: []corev1.Taint{
{
Effect: utils.KosmosNodeTaintEffect,
Key: utils.KosmosNodeTaintKey,
Value: utils.KosmosNodeValue,
},
},
NodeSelector: v1alpha1.NodeSelector{
NodeName: nodeName,
},
}
leafModels = append(leafModels, leafModel)
}
klog.V(7).Infof("all new node in cluster %s: %v", cluster.Name, newNodeNames)
klog.V(7).Infof("all node in cluster %s: %v", cluster.Name, allNodeNamesMap)
cluster.Spec.ClusterTreeOptions.LeafModels = leafModels

return &newNodeNames, &allNodeNamesMap, nil
}

func (c *VirtualClusterJoinController) CreateOrUpdateCluster(ctx context.Context, request reconcile.Request,
kosmosClient versioned.Interface, k8sClient kubernetes.Interface, newNodeNames *[]string,
allNodeNamesMap *map[string]struct{}, cluster *v1alpha1.Cluster) error {
old, err := kosmosClient.KosmosV1alpha1().Clusters().Get(context.TODO(), cluster.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
_, err = kosmosClient.KosmosV1alpha1().Clusters().Create(context.TODO(), cluster, metav1.CreateOptions{})
if err != nil {
c.ClearSomeNodeOwner(newNodeNames)
return fmt.Errorf("create cluster %s failed: %v", cluster.Name, err)
}
} else {
c.ClearSomeNodeOwner(newNodeNames)
return fmt.Errorf("create cluster %s failed when get it first: %v", cluster.Name, err)
}
klog.Infof("Cluster %s for %s/%s has been created.", cluster.Name, request.Namespace, request.Name)
} else {
cluster.ResourceVersion = old.GetResourceVersion()
_, err = kosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
if err != nil {
c.ClearSomeNodeOwner(newNodeNames)
return fmt.Errorf("update cluster %s failed: %v", cluster.Name, err)
}

k8sNodesList, err := k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("list %s's k8s nodes error: %v", cluster.Name, err)
}
// clear node, delete some node not in new VirtualCluster.spec.PromoteResources.Nodes
for _, node := range k8sNodesList.Items {
if _, ok := (*allNodeNamesMap)[node.Name]; !ok {
// if existed node not in map, it should be deleted
err := k8sClient.CoreV1().Nodes().Delete(context.TODO(), node.Name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("delete %s's k8s nodes error: %v", cluster.Name, err)
}
// clear ndoe's owner
mu.Lock()
nodeOwnerMap[node.Name] = ""
mu.Unlock()
}
}
klog.Infof("Cluster %s for %s/%s has been updated.", cluster.Name, request.Namespace, request.Name)
}
return nil
}

func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error {
kubeconfigStream, err := base64.StdEncoding.DecodeString(vc.Spec.Kubeconfig)
if err != nil {
return fmt.Errorf("decode target kubernetes kubeconfig %s err: %v", vc.Spec.Kubeconfig, err)
}
kosmosClient, _, k8sExtensionsClient, err := c.InitTargetKubeclient(kubeconfigStream)
kosmosClient, k8sClient, k8sExtensionsClient, err := c.InitTargetKubeclient(kubeconfigStream)
if err != nil {
return fmt.Errorf("crd kubernetes client failed: %v", err)
}

// create crd cluster.kosmos.io
klog.Infof("Attempting to create kosmos-clustertree CRDs for virtualcluster %s/%s...", request.Namespace, request.Name)
for _, crdToCreate := range []string{manifest.ServiceImport, manifest.Cluster, manifest.ServiceExport} {
crdObject, err := kosmosctl.GenerateCustomResourceDefinition(crdToCreate, nil)
Expand All @@ -337,6 +444,7 @@ func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, reques
}
}

// construct cluster.kosmos.io cr
clusterName := fmt.Sprintf("virtualcluster-%s-%s", request.Namespace, request.Name)
klog.Infof("Attempting to create cluster %s for %s/%s ...", clusterName, request.Namespace, request.Name)

Expand All @@ -362,66 +470,18 @@ func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, reques
if err != nil {
return fmt.Errorf("crd kubernetes client failed: %v", err)
}
var leafModels []v1alpha1.LeafModel
newNodeNames := []string{}

for _, nodeName := range vc.Spec.PromoteResources.Nodes {
_, err := hostK8sClient.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
if err != nil {
klog.Errorf("node %s doesn't exits: %v", nodeName, err)
continue
}
if !c.AllowNodeOwnbyMulticluster {
mu.Lock()
if len(nodeOwnerMap) > 0 {
if nodeOwner, existed := nodeOwnerMap[nodeName]; existed && len(nodeOwner) > 0 &&
nodeOwner != clusterName {
continue
}
}
nodeOwnerMap[nodeName] = clusterName
newNodeNames = append(newNodeNames, nodeName)
mu.Unlock()
}
leafModel := v1alpha1.LeafModel{
LeafNodeName: nodeName,
Taints: []corev1.Taint{
{
Effect: utils.KosmosNodeTaintEffect,
Key: utils.KosmosNodeTaintKey,
Value: utils.KosmosNodeValue,
},
},
NodeSelector: v1alpha1.NodeSelector{
NodeName: nodeName,
},
}
leafModels = append(leafModels, leafModel)
newNodeNames, allNodeNamesMap, nil := c.CreateClusterObject(ctx, request, vc, hostK8sClient, &cluster)
if err != nil {
return err
}
cluster.Spec.ClusterTreeOptions.LeafModels = leafModels

old, err := kosmosClient.KosmosV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{})
// use client-go to create or update cluster.kosmos.io cr
err = c.CreateOrUpdateCluster(ctx, request, kosmosClient, k8sClient, newNodeNames, allNodeNamesMap, &cluster)
if err != nil {
if apierrors.IsNotFound(err) {
_, err = kosmosClient.KosmosV1alpha1().Clusters().Create(context.TODO(), &cluster, metav1.CreateOptions{})
if err != nil {
c.ClearSomeNodeOwner(newNodeNames)
return fmt.Errorf("create cluster %s failed: %v", clusterName, err)
}
} else {
c.ClearSomeNodeOwner(newNodeNames)
return fmt.Errorf("create cluster %s failed when get it first: %v", clusterName, err)
}
klog.Infof("Cluster %s for %s/%s has been created.", clusterName, request.Namespace, request.Name)
} else {
cluster.ResourceVersion = old.GetResourceVersion()
_, err := kosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), &cluster, metav1.UpdateOptions{})
if err != nil {
c.ClearSomeNodeOwner(newNodeNames)
return fmt.Errorf("update cluster %s failed: %v", clusterName, err)
}
klog.Infof("Cluster %s for %s/%s has been updated.", clusterName, request.Namespace, request.Name)
return err
}

return nil
}

Expand Down

0 comments on commit 78067fe

Please sign in to comment.