Skip to content

Commit

Permalink
title: virtualcluster join controller create mcs crd
Browse files Browse the repository at this point in the history
Signed-off-by: GreatLazyMan <[email protected]>
  • Loading branch information
GreatLazyMan committed Mar 15, 2024
1 parent 98a25f3 commit 4c3010d
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 76 deletions.
8 changes: 4 additions & 4 deletions cmd/clustertree-operator/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
)

type Options struct {
LeaderElection componentbaseconfig.LeaderElectionConfiguration
KubernetesOptions KubernetesOptions
NodeReuse bool
LeaderElection componentbaseconfig.LeaderElectionConfiguration
KubernetesOptions KubernetesOptions
AllowNodeOwnbyMulticluster bool
}

type KubernetesOptions struct {
Expand Down Expand Up @@ -44,5 +44,5 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.IntVar(&o.KubernetesOptions.Burst, "kube-burst", 60, "Burst to use while talking with kube-apiserver.")
flags.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path for kubernetes kubeconfig file, if left blank, will use in cluster way.")
flags.StringVar(&o.KubernetesOptions.Master, "master", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.")
flags.BoolVar(&o.NodeReuse, "node-reuse", false, "different virtual cluster can reuse same node.")
flags.BoolVar(&o.AllowNodeOwnbyMulticluster, "multiowner", false, "Allow node own by multicluster or not.")
}
8 changes: 4 additions & 4 deletions cmd/clustertree-operator/app/tree_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ func run(ctx context.Context, opts *options.Options) error {
}

VirtualClusterJoinController := controller.VirtualClusterJoinController{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(constants.JoinControllerName),
KubeconfigPath: opts.KubernetesOptions.KubeConfig,
NodeReuse: opts.NodeReuse,
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(constants.JoinControllerName),
KubeconfigPath: opts.KubernetesOptions.KubeConfig,
AllowNodeOwnbyMulticluster: opts.AllowNodeOwnbyMulticluster,
}
if err = VirtualClusterJoinController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", constants.JoinControllerName, err)
Expand Down
177 changes: 109 additions & 68 deletions pkg/treeoperator/controller/virtualcluster_join_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

corev1 "k8s.io/api/core/v1"
extensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -32,7 +31,6 @@ import (
clusterManager "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager"
"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/kosmosctl/manifest"
"github.com/kosmos.io/kosmos/pkg/kosmosctl/util"
kosmosctl "github.com/kosmos.io/kosmos/pkg/kosmosctl/util"
"github.com/kosmos.io/kosmos/pkg/treeoperator/constants"
treemanifest "github.com/kosmos.io/kosmos/pkg/treeoperator/manifest"
Expand All @@ -42,14 +40,15 @@ import (

type VirtualClusterJoinController struct {
client.Client
EventRecorder record.EventRecorder
KubeconfigPath string
KubeconfigStream []byte
NodeReuse bool
EventRecorder record.EventRecorder
KubeconfigPath string
KubeconfigStream []byte
AllowNodeOwnbyMulticluster bool
}

var nodeOwnerMap map[string]string = make(map[string]string)
var mu sync.Mutex
var once sync.Once

func (c *VirtualClusterJoinController) RemoveClusterFinalizer(cluster *v1alpha1.Cluster, kosmosClient versioned.Interface) error {
for _, finalizer := range []string{utils.ClusterStartControllerFinalizer, clusterManager.ControllerFinalizerName} {
Expand All @@ -68,6 +67,57 @@ func (c *VirtualClusterJoinController) RemoveClusterFinalizer(cluster *v1alpha1.
return nil
}

func (c *VirtualClusterJoinController) InitNodeOwnerMap() {
vcList := &v1alpha1.VirtualClusterList{}
err := c.List(context.Background(), vcList)
if err != nil {
klog.Errorf("list virtual cluster error: %v", err)
return
}
for _, vc := range vcList.Items {
if vc.Status.Phase == constants.VirtualClusterStatusCompleted {
kubeconfigStream, err := base64.StdEncoding.DecodeString(vc.Spec.Kubeconfig)
if err != nil {
klog.Errorf("virtualcluster %s decode target kubernetes kubeconfig %s err: %v", vc.Name, vc.Spec.Kubeconfig, err)
continue
}
kosmosClient, _, k8sExtensionsClient, err := c.InitTargetKubeclient(kubeconfigStream)
if err != nil {
klog.Errorf("virtualcluster %s crd kubernetes client failed: %v", vc.Name, err)
continue
}
_, err = k8sExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), "clusters.kosmos.io", metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("virtualcluster %s get crd clusters.kosmos.io err: %v", vc.Name, err)
}
klog.Infof("virtualcluster %s crd clusters.kosmos.io doesn't exist", vc.Name)
continue
}
clusters, err := kosmosClient.KosmosV1alpha1().Clusters().List(context.Background(), metav1.ListOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Infof("virtualcluster %s get clusters err: %v", vc.Name, err)
}
klog.Infof("virtualcluster %s cluster doesn't exist", vc.Name)
continue
}
mu.Lock()
for _, cluster := range clusters.Items {
for _, node := range cluster.Spec.ClusterTreeOptions.LeafModels {
if vcName, ok := nodeOwnerMap[node.LeafNodeName]; ok && len(vcName) > 0 {
klog.Warningf("node %s also belong to cluster %s", node.LeafNodeName, vcName)
}
nodeOwnerMap[node.LeafNodeName] = vc.Name
}
}
mu.Unlock()
}
klog.Infof("check virtualcluster %s, nodeOwnerMap is %v", vc.Name, nodeOwnerMap)
}
klog.Infof("Init nodeOwnerMap is %v", nodeOwnerMap)
}

func (c *VirtualClusterJoinController) UninstallClusterTree(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error {
klog.Infof("Start deleting kosmos-clustertree deployment %s/%s-clustertree-cluster-manager...", request.Namespace, request.Name)
clustertreeDeploy, err := kosmosctl.GenerateDeployment(treemanifest.ClusterTreeClusterManagerDeployment, treemanifest.DeploymentReplace{
Expand Down Expand Up @@ -135,7 +185,7 @@ func (c *VirtualClusterJoinController) UninstallClusterTree(ctx context.Context,
if err != nil {
return fmt.Errorf("decode target kubernetes kubeconfig %s err: %v", vc.Spec.Kubeconfig, err)
}
err, kosmosClient, _, _ := c.InitTargetKubeclient(kubeconfigStream)
kosmosClient, _, _, err := c.InitTargetKubeclient(kubeconfigStream)
if err != nil {
return fmt.Errorf("create kubernetes client failed: %v", err)
}
Expand All @@ -147,7 +197,7 @@ func (c *VirtualClusterJoinController) UninstallClusterTree(ctx context.Context,
return fmt.Errorf("get cluster %s failed when we try to del: %v", clusterName, err)
}
} else {
if !c.NodeReuse {
if !c.AllowNodeOwnbyMulticluster {
mu.Lock()
for _, nodeName := range old.Spec.ClusterTreeOptions.LeafModels {
nodeOwnerMap[nodeName.LeafNodeName] = ""
Expand All @@ -170,30 +220,30 @@ func (c *VirtualClusterJoinController) UninstallClusterTree(ctx context.Context,
return nil
}

func (c *VirtualClusterJoinController) InitTargetKubeclient(kubeconfigStream []byte) (error, versioned.Interface, kubernetes.Interface, extensionsclient.Interface) {
func (c *VirtualClusterJoinController) InitTargetKubeclient(kubeconfigStream []byte) (versioned.Interface, kubernetes.Interface, extensionsclient.Interface, error) {
//targetKubeconfig := path.Join(DefaultKubeconfigPath, "kubeconfig")
//config, err := utils.RestConfig(targetKubeconfig, "")
config, err := utils.NewConfigFromBytes(kubeconfigStream)
if err != nil {
return fmt.Errorf("generate kubernetes config failed: %s", err), nil, nil, nil
return nil, nil, nil, fmt.Errorf("generate kubernetes config failed: %s", err)
}

kosmosClient, err := versioned.NewForConfig(config)
if err != nil {
return fmt.Errorf("generate Kosmos client failed: %v", err), nil, nil, nil
return nil, nil, nil, fmt.Errorf("generate Kosmos client failed: %v", err)
}

k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("generate K8s basic client failed: %v", err), nil, nil, nil
return nil, nil, nil, fmt.Errorf("generate K8s basic client failed: %v", err)
}

k8sExtensionsClient, err := extensionsclient.NewForConfig(config)
if err != nil {
return fmt.Errorf("generate K8s extensions client failed: %v", err), nil, nil, nil
return nil, nil, nil, fmt.Errorf("generate K8s extensions client failed: %v", err)
}

return nil, kosmosClient, k8sClient, k8sExtensionsClient
return kosmosClient, k8sClient, k8sExtensionsClient, nil
}

func (c *VirtualClusterJoinController) DeployKosmos(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error {
Expand Down Expand Up @@ -249,57 +299,43 @@ func (c *VirtualClusterJoinController) DeployKosmos(ctx context.Context, request
return nil
}

func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error {
klog.Infof("Attempting to create kosmos-clustertree CRDs for virtualcluster %s/%s...", request.Namespace, request.Name)
clustertreeCluster, err := util.GenerateCustomResourceDefinition(manifest.Cluster, nil)
if err != nil {
return err
func (c *VirtualClusterJoinController) ClearSomeNodeOwner(nodeNames []string) {
if !c.AllowNodeOwnbyMulticluster {
mu.Lock()
for _, nodeName := range nodeNames {
nodeOwnerMap[nodeName] = ""
}
mu.Unlock()
}
}

func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error {
kubeconfigStream, err := base64.StdEncoding.DecodeString(vc.Spec.Kubeconfig)
if err != nil {
return fmt.Errorf("decode target kubernetes kubeconfig %s err: %v", vc.Spec.Kubeconfig, err)
}
err, kosmosClient, _, k8sExtensionsClient := c.InitTargetKubeclient(kubeconfigStream)
kosmosClient, _, k8sExtensionsClient, err := c.InitTargetKubeclient(kubeconfigStream)
if err != nil {
return fmt.Errorf("crd kubernetes client failed: %v", err)
}

serviceExport, err := util.GenerateCustomResourceDefinition(manifest.ServiceExport, nil)
if err != nil {
return err
}
_, err = k8sExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(context.Background(), serviceExport, metav1.CreateOptions{})
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("Create CRD %s for virtualcluster %s/%s failed: %v.",
serviceExport.Name, request.Namespace, request.Name, err)
}
}
klog.Infof("Create CRD %s for virtualcluster %s/%s successful.", serviceExport.Name, request.Namespace, request.Name)

serviceImport, err := util.GenerateCustomResourceDefinition(manifest.ServiceImport, nil)
if err != nil {
return err
}
_, err = k8sExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(context.Background(), serviceImport, metav1.CreateOptions{})
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("Create CRD %s for virtualcluster %s/%s failed: %v.",
serviceImport.Name, request.Namespace, request.Name, err)
klog.Infof("Attempting to create kosmos-clustertree CRDs for virtualcluster %s/%s...", request.Namespace, request.Name)
for _, crdToCreate := range []string{manifest.ServiceImport, manifest.Cluster, manifest.ServiceExport} {
crdObject, err := kosmosctl.GenerateCustomResourceDefinition(crdToCreate, nil)
if err != nil {
return err
}
}
klog.Info("Create CRD %s for virtualcluster %s/%s successful.", serviceImport.Name, request.Namespace, request.Name)

_, err = k8sExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(context.Background(), clustertreeCluster, metav1.CreateOptions{})
if err != nil {
if apierrors.IsAlreadyExists(err) {
klog.Warningf("CRD %s is existed, creation process will skip", clustertreeCluster.Name)
_, err = k8sExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(context.Background(), crdObject, metav1.CreateOptions{})
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("create CRD %s for virtualcluster %s/%s failed: %v",
crdObject.Name, request.Namespace, request.Name, err)
}
klog.Warningf("CRD %s is existed, creation process will skip", crdObject.Name)
} else {
return fmt.Errorf("CRD create failed for virtualcluster %s/%s: %v", request.Namespace, request.Name, err)
klog.Infof("Create CRD %s for virtualcluster %s/%s successful.", crdObject.Name, request.Namespace, request.Name)
}
}
klog.Infof("Create CRD %s for virtualcluster %s/%s successful.", clustertreeCluster.Name, request.Namespace, request.Name)

clusterName := fmt.Sprintf("virtualcluster-%s-%s", request.Namespace, request.Name)
klog.Infof("Attempting to create cluster %s for %s/%s ...", clusterName, request.Namespace, request.Name)
Expand Down Expand Up @@ -327,18 +363,25 @@ func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, reques
return fmt.Errorf("crd kubernetes client failed: %v", err)
}
var leafModels []v1alpha1.LeafModel
mu.Lock()
newNodeNames := []string{}

for _, nodeName := range vc.Spec.PromoteResources.Nodes {
_, err := hostK8sClient.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
if err != nil {
klog.Errorf("node %s doesn't exits: %v", nodeName, err)
continue
}
if !c.NodeReuse && len(nodeOwnerMap) > 0 {
if nodeOwner, existed := nodeOwnerMap[nodeName]; existed && len(nodeOwner) > 0 &&
nodeOwner != clusterName {
continue
if !c.AllowNodeOwnbyMulticluster {
mu.Lock()
if len(nodeOwnerMap) > 0 {
if nodeOwner, existed := nodeOwnerMap[nodeName]; existed && len(nodeOwner) > 0 &&
nodeOwner != clusterName {
continue
}
}
nodeOwnerMap[nodeName] = clusterName
newNodeNames = append(newNodeNames, nodeName)
mu.Unlock()
}
leafModel := v1alpha1.LeafModel{
LeafNodeName: nodeName,
Expand All @@ -353,37 +396,32 @@ func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, reques
NodeName: nodeName,
},
}
if !c.NodeReuse {
nodeOwnerMap[nodeName] = clusterName
}
leafModels = append(leafModels, leafModel)
}
mu.Unlock()
cluster.Spec.ClusterTreeOptions.LeafModels = leafModels

old, err := kosmosClient.KosmosV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
_, err = kosmosClient.KosmosV1alpha1().Clusters().Create(context.TODO(), &cluster, metav1.CreateOptions{})
if err != nil {
c.ClearSomeNodeOwner(newNodeNames)
return fmt.Errorf("create cluster %s failed: %v", clusterName, err)
}
} else {
c.ClearSomeNodeOwner(newNodeNames)
return fmt.Errorf("create cluster %s failed when get it first: %v", clusterName, err)
}
klog.Infof("Cluster %s for %s/%s has been created.", clusterName, request.Namespace, request.Name)
} else {
cluster.ResourceVersion = old.GetResourceVersion()
update, err := kosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), &cluster, metav1.UpdateOptions{})
_, err := kosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), &cluster, metav1.UpdateOptions{})
if err != nil {
c.ClearSomeNodeOwner(newNodeNames)
return fmt.Errorf("update cluster %s failed: %v", clusterName, err)
} else {
klog.Infof("Cluster %s hase been updated.", clusterName)
}
if !update.DeletionTimestamp.IsZero() {
return fmt.Errorf("cluster %s is deleteting, need requeue", clusterName)
}
klog.Infof("Cluster %s for %s/%s has been updated.", clusterName, request.Namespace, request.Name)
}
klog.Infof("Cluster %s for %s/%s has been created.", clusterName, request.Namespace, request.Name)
return nil
}

Expand Down Expand Up @@ -430,10 +468,13 @@ func (c *VirtualClusterJoinController) InstallClusterTree(ctx context.Context, r
func (c *VirtualClusterJoinController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
klog.V(4).Infof("============ %s starts to reconcile %s ============", constants.JoinControllerName, request.Name)
defer klog.V(4).Infof("============ %s reconcile finish %s ============", constants.JoinControllerName, request.Name)
if !c.AllowNodeOwnbyMulticluster {
once.Do(c.InitNodeOwnerMap)
}
var vc v1alpha1.VirtualCluster

if err := c.Get(ctx, request.NamespacedName, &vc); err != nil {
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
klog.Errorf("get %s error: %v", request.NamespacedName, err)
Expand Down

0 comments on commit 4c3010d

Please sign in to comment.