diff --git a/cmd/clustertree-operator/app/tree_operator.go b/cmd/clustertree-operator/app/tree_operator.go index 1b606d1e7..0707c9f60 100644 --- a/cmd/clustertree-operator/app/tree_operator.go +++ b/cmd/clustertree-operator/app/tree_operator.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/spf13/cobra" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/client-go/tools/clientcmd" cliflag "k8s.io/component-base/cli/flag" "k8s.io/klog/v2" @@ -54,9 +55,15 @@ func run(ctx context.Context, opts *options.Options) error { } config.QPS, config.Burst = opts.KubernetesOptions.QPS, opts.KubernetesOptions.Burst + newscheme := scheme.NewSchema() + err = apiextensionsv1.AddToScheme(newscheme) + if err != nil { + panic(err) + } + mgr, err := controllerruntime.NewManager(config, controllerruntime.Options{ Logger: klog.Background(), - Scheme: scheme.NewSchema(), + Scheme: newscheme, LeaderElection: opts.LeaderElection.LeaderElect, LeaderElectionID: opts.LeaderElection.ResourceName, LeaderElectionNamespace: opts.LeaderElection.ResourceNamespace, diff --git a/deploy/crds/kosmos.io_virtualclusters.yaml b/deploy/crds/kosmos.io_virtualclusters.yaml index 989caedc2..5ad26107a 100644 --- a/deploy/crds/kosmos.io_virtualclusters.yaml +++ b/deploy/crds/kosmos.io_virtualclusters.yaml @@ -13,7 +13,7 @@ spec: listKind: VirtualClusterList plural: virtualclusters singular: virtualcluster - scope: Cluster + scope: Namespaced versions: - name: v1alpha1 schema: @@ -34,6 +34,14 @@ spec: spec: description: Spec is the specification for the behaviour of the VirtualCluster. properties: + hostKubeconfig: + description: HostKubeconfig is the kubeconfig of the host kubernetes's + control plane + type: string + imageRepository: + type: string + kosmosVersion: + type: string kubeconfig: description: Kubeconfig is the kubeconfig of the virtual kubernetes's control plane diff --git a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go index 41463b76a..c34f050d7 100644 --- a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go +++ b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go @@ -18,7 +18,7 @@ const ( // +genclient // +genclient:nonNamespaced -// +kubebuilder:resource:scope="Cluster" +// +kubebuilder:resource:scope="Namespaced" // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type VirtualCluster struct { @@ -39,10 +39,20 @@ type VirtualClusterSpec struct { // +optional Kubeconfig string `json:"kubeconfig,omitempty"` + // HostKubeconfig is the kubeconfig of the host kubernetes's control plane + // +optional + HostKubeconfig string `json:"hostKubeconfig,omitempty"` + // PromoteResources definites the resources for promote to the kubernetes's control plane, // the resources can be nodes or just cpu,memory or gpu resources // +required PromoteResources PromoteResources `json:"promoteResources"` + + // +optional + ImageRepository string `json:"imageRepository,omitempty"` + + // +optional + KosmosVersion string `json:"kosmosVersion,omitempty"` } type PromoteResources struct { diff --git a/pkg/kosmosctl/manifest/manifest_crds.go b/pkg/kosmosctl/manifest/manifest_crds.go index 1174c1550..6e0b0d346 100644 --- a/pkg/kosmosctl/manifest/manifest_crds.go +++ b/pkg/kosmosctl/manifest/manifest_crds.go @@ -420,6 +420,8 @@ spec: properties: clusterLinkOptions: properties: + autodetectionMethod: + type: string bridgeCIDRs: default: ip: 220.0.0.0/8 @@ -652,7 +654,6 @@ spec: storage: true subresources: {} ` - const NodeConfig = `--- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition diff --git a/pkg/treeoperator/manifest_deployments.go b/pkg/treeoperator/manifest_deployments.go new file mode 100644 index 000000000..249159ace --- /dev/null +++ b/pkg/treeoperator/manifest_deployments.go @@ -0,0 +1,59 @@ +package treeoperator + +const ( + DefaultKubeconfigPath = "/etc/cluster-tree/cert" + ClusterTreeClusterManagerDeployment = `--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: clustertree-cluster-manager + namespace: {{ .Namespace }} + labels: + app: clustertree-cluster-manager +spec: + replicas: 1 + selector: + matchLabels: + app: clustertree-cluster-manager + template: + metadata: + labels: + app: clustertree-cluster-manager + spec: + serviceAccountName: clustertree + containers: + - name: manager + image: {{ .ImageRepository }}/clustertree-cluster-manager:{{ .Version }} + imagePullPolicy: IfNotPresent + env: + - name: APISERVER_CERT_LOCATION + value: {{ .FilePath }}/cert.pem + - name: APISERVER_KEY_LOCATION + value: {{ .FilePath }}/key.pem + - name: LEAF_NODE_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + volumeMounts: + - name: credentials + mountPath: "{{ .FilePath }}" + readOnly: true + command: + - clustertree-cluster-manager + - --multi-cluster-service=true + - --v=4 + - --leader-elect-resource-namespace={{ .Namespace }} + - --kubeconfig={{ .FilePath }}/kubeconfig + volumes: + - name: credentials + secret: + secretName: clustertree-cluster-manager +` +) + +type DeploymentReplace struct { + Namespace string + ImageRepository string + Version string + FilePath string +} diff --git a/pkg/treeoperator/manifest_secrets.go b/pkg/treeoperator/manifest_secrets.go new file mode 100644 index 000000000..768affefa --- /dev/null +++ b/pkg/treeoperator/manifest_secrets.go @@ -0,0 +1,23 @@ +package treeoperator + +const ( + ClusterTreeClusterManagerSecret = `--- +apiVersion: v1 +kind: Secret +metadata: + name: clustertree-cluster-manager + namespace: {{ .Namespace }} +type: Opaque +data: + cert.pem: {{ .Cert }} + key.pem: {{ .Key }} + kubeconfig: {{ .Kubeconfig }} +` +) + +type SecretReplace struct { + Namespace string + Cert string + Key string + Kubeconfig string +} diff --git a/pkg/treeoperator/virtualcluster_join_controller.go b/pkg/treeoperator/virtualcluster_join_controller.go index eb9213787..2817d1a77 100644 --- a/pkg/treeoperator/virtualcluster_join_controller.go +++ b/pkg/treeoperator/virtualcluster_join_controller.go @@ -1,9 +1,34 @@ package treeoperator import ( + "context" + "encoding/base64" + "fmt" + + corev1 "k8s.io/api/core/v1" + extensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/cert" + "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" + "github.com/kosmos.io/kosmos/pkg/kosmosctl/manifest" + "github.com/kosmos.io/kosmos/pkg/kosmosctl/util" + kosmosctl "github.com/kosmos.io/kosmos/pkg/kosmosctl/util" + "github.com/kosmos.io/kosmos/pkg/utils" ) const ( @@ -13,8 +38,289 @@ const ( type VirtualClusterJoinController struct { client.Client EventRecorder record.EventRecorder + K8sClient kubernetes.Interface } -func (c *VirtualClusterJoinController) SetupWithManager(mgr manager.Manager) error { +func (c *VirtualClusterJoinController) UninstallClustertree(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error { + klog.Info("Start deleting kosmos-clustertree Deployment...") + clustertreeDeploy, err := kosmosctl.GenerateDeployment(manifest.ClusterTreeClusterManagerDeployment, manifest.DeploymentReplace{ + Namespace: request.Namespace, + ImageRepository: vc.Spec.ImageRepository, + Version: vc.Spec.KosmosVersion, + }) + if err != nil { + return err + } + err = c.Get(ctx, request.NamespacedName, clustertreeDeploy) + if err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("get clustertree deployment error, deployment deleted failed: %v", err) + } + } else { + err := c.Delete(ctx, clustertreeDeploy) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("delete kosmos-clustertree Deployment error: %v", err) + } + } + + klog.Info("Deployment has been deleted. ") + + klog.Info("Start deleting kosmos-clustertree secret") + clustertreeSecret, err := kosmosctl.GenerateSecret(manifest.ClusterTreeClusterManagerSecret, manifest.SecretReplace{ + Namespace: request.Namespace, + Cert: cert.GetCrtEncode(), + Key: cert.GetKeyEncode(), + }) + if err != nil { + return err + } + err = c.Get(ctx, request.NamespacedName, clustertreeSecret) + if err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("get clustertree secret error, secret deleted failed: %v", err) + } + } else { + err := c.Delete(ctx, clustertreeSecret) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("delete kosmos-clustertree Deployment error: %v", err) + } + } + klog.Info("Secret has been deleted. ") + return nil +} + +func (c *VirtualClusterJoinController) InitTargetKubeclinet(kubeconfigStream []byte) (error, versioned.Interface, kubernetes.Interface, extensionsclient.Interface) { + //targetKubeconfig := path.Join(DefaultKubeconfigPath, "kubeconfig") + //config, err := utils.RestConfig(targetKubeconfig, "") + config, err := utils.NewConfigFromBytes(kubeconfigStream) + if err != nil { + return fmt.Errorf("generate kubernetes config failed: %s", err), nil, nil, nil + } + + kosmosClient, err := versioned.NewForConfig(config) + if err != nil { + return fmt.Errorf("generate Kosmos client failed: %v", err), nil, nil, nil + } + + k8sClient, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("generate K8s basic client failed: %v", err), nil, nil, nil + } + + k8sExtensionsClient, err := extensionsclient.NewForConfig(config) + if err != nil { + return fmt.Errorf("generate K8s extensions client failed: %v", err), nil, nil, nil + } + + return nil, kosmosClient, k8sClient, k8sExtensionsClient +} + +func (c *VirtualClusterJoinController) InstallClustertree(ctx context.Context, namespace string, vc *v1alpha1.VirtualCluster) error { + klog.Infof("Start creating kosmos-clustertree in namespace %s", namespace) + + klog.Infof("Start creating kosmos-clustertree secret") + clustertreeSecret, err := kosmosctl.GenerateSecret(ClusterTreeClusterManagerSecret, SecretReplace{ + Namespace: namespace, + Cert: cert.GetCrtEncode(), + Key: cert.GetKeyEncode(), + Kubeconfig: vc.Spec.Kubeconfig, + }) + if err != nil { + return err + } + err = c.Create(ctx, clustertreeSecret) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("install clustertree error, secret created failed: %v", err) + } + } + klog.Info("Secret has been created. ") + + klog.Info("Start creating kosmos-clustertree Deployment...") + clustertreeDeploy, err := kosmosctl.GenerateDeployment(ClusterTreeClusterManagerDeployment, DeploymentReplace{ + Namespace: namespace, + ImageRepository: vc.Spec.ImageRepository, + Version: vc.Spec.KosmosVersion, + FilePath: DefaultKubeconfigPath, + }) + if err != nil { + return err + } + err = c.Create(ctx, clustertreeDeploy) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("install clustertree error, deployment created failed: %v", err) + } + } + klog.Info("Deployment has been created. ") + + klog.Info("Attempting to create kosmos-clustertree CRDs...") + clustertreeCluster, err := util.GenerateCustomResourceDefinition(manifest.Cluster, nil) + if err != nil { + return err + } + + kubeconfigStream, err := base64.StdEncoding.DecodeString(vc.Spec.Kubeconfig) + if err != nil { + return fmt.Errorf("decode target kubernetes kubeconfig %s err: %v", vc.Spec.Kubeconfig, err) + } + err, kosmosClient, _, k8sExtensionsClient := c.InitTargetKubeclinet(kubeconfigStream) + if err != nil { + return fmt.Errorf("crd kubernetes client failed: %v", err) + } + _, err = k8sExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(context.Background(), clustertreeCluster, metav1.CreateOptions{}) + if err != nil { + if apierrors.IsAlreadyExists(err) { + klog.Warningf("CRD %s is existed, creation process will skip", clustertreeCluster.Name) + } else { + return fmt.Errorf("crd create failed: %v", err) + } + } + klog.Info("Create CRD " + clustertreeCluster.Name + " successful.") + + clusterName := fmt.Sprintf("virtualcluster-%s", namespace) + klog.Infof("Attempting to create cluster %s ...", clusterName) + + kubeconfigStream, err = base64.StdEncoding.DecodeString(vc.Spec.HostKubeconfig) + if err != nil { + return fmt.Errorf("decode target kubernetes kubeconfig %s err: %v", vc.Spec.HostKubeconfig, err) + } + cluster := v1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + }, + Spec: v1alpha1.ClusterSpec{ + Kubeconfig: kubeconfigStream, + Namespace: namespace, + ImageRepository: vc.Spec.ImageRepository, + ClusterLinkOptions: &v1alpha1.ClusterLinkOptions{ + Enable: false, + NetworkType: v1alpha1.NetWorkTypeGateWay, + IPFamily: v1alpha1.IPFamilyTypeALL, + }, + ClusterTreeOptions: &v1alpha1.ClusterTreeOptions{ + Enable: true, + }, + }, + } + + hostK8sClient, err := utils.NewClientFromBytes(kubeconfigStream) + if err != nil { + return fmt.Errorf("crd kubernetes client failed: %v", err) + } + var leafModels []v1alpha1.LeafModel + for _, nodeName := range vc.Spec.PromoteResources.Nodes { + _, err := hostK8sClient.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + if err != nil { + klog.Errorf("node %s doesn't exits: %v", nodeName, err) + continue + } + leafModel := v1alpha1.LeafModel{ + LeafNodeName: nodeName, + Taints: []corev1.Taint{ + { + Effect: utils.KosmosNodeTaintEffect, + Key: utils.KosmosNodeTaintKey, + Value: utils.KosmosNodeValue, + }, + }, + NodeSelector: v1alpha1.NodeSelector{ + NodeName: nodeName, + }, + } + leafModels = append(leafModels, leafModel) + } + cluster.Spec.ClusterTreeOptions.LeafModels = leafModels + + old, err := kosmosClient.KosmosV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + _, err = kosmosClient.KosmosV1alpha1().Clusters().Create(context.TODO(), &cluster, metav1.CreateOptions{}) + } + if err != nil { + return fmt.Errorf("create cluster %s failed: %v", clusterName, err) + } + } else { + cluster.ResourceVersion = old.GetResourceVersion() + update, err := kosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), &cluster, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("update cluster %s failed: %v", clusterName, err) + } else { + klog.Infof("Cluster %s hase been updated.", clusterName) + } + if !update.DeletionTimestamp.IsZero() { + return fmt.Errorf("cluster %s is deleteting, need requeue", clusterName) + } + } + klog.Infof("Cluster %s has been created.", clusterName) + return nil } + +func (c *VirtualClusterJoinController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + var vc v1alpha1.VirtualCluster + + if err := c.Get(ctx, request.NamespacedName, &vc); err != nil { + if errors.IsNotFound(err) { + // TODO: we cannot get leaf pod when we donnot known the node name of pod, so delete all ... + return reconcile.Result{}, nil + } + klog.Errorf("get %s error: %v", request.NamespacedName, err) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + if vc.DeletionTimestamp.IsZero() { + err := c.InstallClustertree(ctx, request.Namespace, &vc) + if err != nil { + klog.Errorf("install %s error: %v", request.NamespacedName, err) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + } else { + err := c.UninstallClustertree(ctx, request, &vc) + if err != nil { + klog.Errorf("uninstall %s error: %v", request.NamespacedName, err) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + } + + return reconcile.Result{}, nil +} + +func (c *VirtualClusterJoinController) SetupWithManager(mgr manager.Manager) error { + if c.Client == nil { + c.Client = mgr.GetClient() + } + + skipFunc := func(obj client.Object) bool { + // skip reservedNS + if obj.GetNamespace() == utils.ReservedNS { + return false + } + + p := obj.(*v1alpha1.VirtualCluster) + + if !p.DeletionTimestamp.IsZero() { + return true + } + return true + } + + return ctrl.NewControllerManagedBy(mgr). + Named(JoinControllerName). + WithOptions(controller.Options{}). + For(&v1alpha1.VirtualCluster{}, builder.WithPredicates(predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + return skipFunc(createEvent.Object) + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + return skipFunc(updateEvent.ObjectNew) + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + return skipFunc(deleteEvent.Object) + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + // TODO + return false + }, + })). + Complete(c) +}