diff --git a/cmd/kubenest/operator/app/operator.go b/cmd/kubenest/operator/app/operator.go index df69aca52..efddcb157 100644 --- a/cmd/kubenest/operator/app/operator.go +++ b/cmd/kubenest/operator/app/operator.go @@ -91,6 +91,7 @@ func run(ctx context.Context, opts *options.Options) error { Config: mgr.GetConfig(), EventRecorder: mgr.GetEventRecorderFor(constants.InitControllerName), HostPortManager: hostPortManager, + RootClientSet: hostKubeClient, } if err = VirtualClusterInitController.SetupWithManager(mgr); err != nil { return fmt.Errorf("error starting %s: %v", constants.InitControllerName, err) diff --git a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go index 21b0bcb6c..2719cf0b4 100644 --- a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go +++ b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go @@ -14,12 +14,11 @@ const ( Initialized Phase = "Initialized" // Completed means everything is ready,kosmos is joined, and resource is promoted Completed Phase = "Completed" - // ControllerPlaneCompleted means kubernetes control plane is ready,kosmos is joined, and resource is promoted - ControllerPlaneCompleted Phase = "ControllerPlaneCompleted" // AllNodeReady means all nodes have joined the virtual control plane and are in the running state AllNodeReady Phase = "AllNodeReady" // Updating means that some changes are happening Updating Phase = "Updating" + Pending Phase = "Pending" ) // +genclient diff --git a/pkg/kubenest/constants/constant.go b/pkg/kubenest/constants/constant.go index 2bd99c98c..23857ee43 100644 --- a/pkg/kubenest/constants/constant.go +++ b/pkg/kubenest/constants/constant.go @@ -6,11 +6,10 @@ const ( InitControllerName = "virtual-cluster-init-controller" NodeControllerName = "virtual-cluster-node-controller" KosmosJoinControllerName = "kosmos-join-controller" + KosmosNs = "kosmos-system" SystemNs = "kube-system" DefauleImageRepositoryEnv = "IMAGE_REPOSITIRY" DefauleImageVersionEnv = "IMAGE_VERSION" - VirtualClusterStatusCompleted = "Completed" - VirtualClusterStatusUpdating = "Updating" VirtualClusterFinalizerName = "kosmos.io/virtual-cluster-finalizer" ServiceType = "NodePort" EtcdServiceType = "ClusterIP" @@ -18,7 +17,7 @@ const ( ControllerFinalizerName = "operator.virtualcluster.io/finalizer" DefaultKubeconfigPath = "/etc/cluster-tree/cert" Label = "virtualCluster-app" - ComponentBeReadyTimeout = 120 * time.Second + ComponentBeReadyTimeout = 300 * time.Second // CertificateBlockType is a possible value for pem.Block.Type. CertificateBlockType = "CERTIFICATE" @@ -61,11 +60,13 @@ const ( VirtualClusterSchedulerReplicas = 1 VirtualClusterSchedulerComponent = "VirtualClusterScheduler" VirtualClusterSchedulerComponentConfigMap = "scheduler-config" - VirtualClusterScheduler = "virtualCluster-scheduler" + VirtualClusterScheduler = "scheduler" + VirtualClusterKubeProxyComponent = "kube-proxy" //controlplane auth - AdminConfig = "admin-config" - KubeConfig = "kubeconfig" + AdminConfig = "admin-config" + KubeConfig = "kubeconfig" + KubeProxyConfigmap = "kube-proxy" //controlplane upload VirtualClusterLabelKeyName = "app.kubernetes.io/managed-by" @@ -77,6 +78,14 @@ const ( InitAction Action = "init" // DeInitAction represents delete virtual cluster instance DeInitAction Action = "deInit" + + ManifestComponentsConfigmap = "components-manifest-cm" + NodePoolConfigmap = "node-pool" + NodeShareState = "share" + NodeVirtualclusterState = "virtualcluster" + NodeFreeState = "free" + + WaitAllPodsRunningTimeoutSeconds = 1800 ) type Action string diff --git a/pkg/kubenest/controller/kosmos/kosmos_join_controller.go b/pkg/kubenest/controller/kosmos/kosmos_join_controller.go index 4c08a58a8..2f8ed3368 100644 --- a/pkg/kubenest/controller/kosmos/kosmos_join_controller.go +++ b/pkg/kubenest/controller/kosmos/kosmos_join_controller.go @@ -75,7 +75,7 @@ func (c *KosmosJoinController) InitNodeOwnerMap() { return } for _, vc := range vcList.Items { - if vc.Status.Phase == constants.VirtualClusterStatusCompleted { + if vc.Status.Phase == v1alpha1.Completed { kubeconfigStream, err := base64.StdEncoding.DecodeString(vc.Spec.Kubeconfig) if err != nil { klog.Errorf("virtualcluster %s decode target kubernetes kubeconfig %s err: %v", vc.Name, vc.Spec.Kubeconfig, err) @@ -542,7 +542,7 @@ func (c *KosmosJoinController) Reconcile(ctx context.Context, request reconcile. return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } if vc.DeletionTimestamp.IsZero() { - if vc.Status.Phase != constants.VirtualClusterStatusCompleted { + if vc.Status.Phase != v1alpha1.Completed { klog.Infof("cluster's status is %s, skip", vc.Status.Phase) return reconcile.Result{}, nil } diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go b/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go index 253237747..713cea397 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go @@ -226,7 +226,7 @@ func (r *NodeController) Reconcile(ctx context.Context, request reconcile.Reques return reconcile.Result{}, nil } - if virtualCluster.Status.Phase != v1alpha1.ControllerPlaneCompleted { + if virtualCluster.Status.Phase != v1alpha1.Initialized { klog.V(4).Infof("virtualcluster wait cluster ready, cluster name: %s", virtualCluster.Name) return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } diff --git a/pkg/kubenest/controller/virtualcluster_execute_controller.go b/pkg/kubenest/controller/virtualcluster_execute_controller.go index 2b76daae9..6bdeb7b6d 100644 --- a/pkg/kubenest/controller/virtualcluster_execute_controller.go +++ b/pkg/kubenest/controller/virtualcluster_execute_controller.go @@ -1,12 +1,9 @@ package controller import ( - "context" - "encoding/base64" "fmt" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - clientset "k8s.io/client-go/kubernetes" + "github.com/pkg/errors" "k8s.io/client-go/rest" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -56,32 +53,29 @@ func (e *Executor) Execute() error { if err := e.phase.Run(); err != nil { klog.ErrorS(err, "failed to executed the workflow", "workflow", "virtual cluster", klog.KObj(e.virtualCluster)) - } - //TODO modify status for virtualcluster - if err := e.afterRunPhase(); err != nil { - return err + return errors.Wrap(err, "failed to executed the workflow") } klog.InfoS("Successfully executed the workflow", "workflow", "virtual cluster", klog.KObj(e.virtualCluster)) return nil } -func (e *Executor) afterRunPhase() error { - localClusterClient, err := clientset.NewForConfig(e.config) - if err != nil { - return fmt.Errorf("error when creating local cluster client, err: %w", err) - } - secret, err := localClusterClient.CoreV1().Secrets(e.virtualCluster.GetNamespace()).Get(context.TODO(), - fmt.Sprintf("%s-%s", e.virtualCluster.GetName(), constants.AdminConfig), metav1.GetOptions{}) - if err != nil { - return err - } - - kubeconfigBytes := secret.Data[constants.KubeConfig] - configString := base64.StdEncoding.EncodeToString(kubeconfigBytes) - e.virtualCluster.Spec.Kubeconfig = configString - e.virtualCluster.Status.Phase = v1alpha1.Completed - return e.Client.Update(context.TODO(), e.virtualCluster) -} +//func (e *Executor) afterRunPhase() error { +// localClusterClient, err := clientset.NewForConfig(e.config) +// if err != nil { +// return fmt.Errorf("error when creating local cluster client, err: %w", err) +// } +// secret, err := localClusterClient.CoreV1().Secrets(e.virtualCluster.GetNamespace()).Get(context.TODO(), +// fmt.Sprintf("%s-%s", e.virtualCluster.GetName(), constants.AdminConfig), metav1.GetOptions{}) +// if err != nil { +// return err +// } +// +// kubeconfigBytes := secret.Data[constants.KubeConfig] +// configString := base64.StdEncoding.EncodeToString(kubeconfigBytes) +// e.virtualCluster.Spec.Kubeconfig = configString +// e.virtualCluster.Status.Phase = v1alpha1.Completed +// return e.Client.Update(context.TODO(), e.virtualCluster) +//} func recognizeActionFor(virtualCluster *v1alpha1.VirtualCluster) constants.Action { if !virtualCluster.DeletionTimestamp.IsZero() { diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index 5aa4198ea..f5ff1e1b2 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -2,10 +2,20 @@ package controller import ( "context" + "encoding/base64" + "fmt" + "sync" "time" - "k8s.io/apimachinery/pkg/api/errors" + "github.com/pkg/errors" + "gopkg.in/yaml.v3" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" @@ -27,6 +37,16 @@ type VirtualClusterInitController struct { Config *rest.Config EventRecorder record.EventRecorder HostPortManager *vcnodecontroller.HostPortManager + RootClientSet kubernetes.Interface +} + +var lock sync.Mutex + +type NodePool struct { + Address string `json:"address" yaml:"address"` + Labels map[string]string `json:"labels" yaml:"labels"` + Cluster string `json:"cluster" yaml:"cluster"` + State string `json:"state" yaml:"state"` } func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { @@ -36,26 +56,62 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re klog.V(4).InfoS("Finished syncing virtual cluster", "virtual cluster", request, "duration", time.Since(startTime)) }() - virtualCluster := &v1alpha1.VirtualCluster{} - if err := c.Get(ctx, request.NamespacedName, virtualCluster); err != nil { - if errors.IsNotFound(err) { + originalCluster := &v1alpha1.VirtualCluster{} + if err := c.Get(ctx, request.NamespacedName, originalCluster); err != nil { + if apierrors.IsNotFound(err) { klog.V(2).InfoS("Virtual Cluster has been deleted", "Virtual Cluster", request) return reconcile.Result{}, nil } return reconcile.Result{}, nil } + updatedCluster := originalCluster.DeepCopy() - //TODO The object is being deleted - //TODO The object is being updated - - if virtualCluster.Status.Phase == constants.VirtualClusterStatusCompleted { - klog.Infof("cluster's status is %s, skip", virtualCluster.Status.Phase) + //The object is being deleted + if !originalCluster.DeletionTimestamp.IsZero() { + //TODO return reconcile.Result{}, nil } - err := c.syncVirtualCluster(virtualCluster) - if err != nil { - return reconcile.Result{}, nil + if originalCluster.Status.Phase == "" { //create request + updatedCluster.Status.Phase = v1alpha1.Preparing + err := c.Client.Patch(context.TODO(), updatedCluster, client.MergeFrom(originalCluster)) + if err != nil { + return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + } + + err = c.syncVirtualCluster(updatedCluster) + if err != nil { + return reconcile.Result{}, errors.Wrap(err, "Error syncVirtualCluster") + } + updatedCluster.Status.Phase = v1alpha1.Initialized + err = c.Client.Patch(context.TODO(), updatedCluster, client.MergeFrom(originalCluster)) + if err != nil { + return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + } + } else if originalCluster.Status.Phase == v1alpha1.AllNodeReady { + err := c.ensureAllPodsRunning(updatedCluster, constants.WaitAllPodsRunningTimeoutSeconds*time.Second) + if err != nil { + updatedCluster.Status.Phase = v1alpha1.Pending + } else { + updatedCluster.Status.Phase = v1alpha1.Completed + } + err = c.Client.Patch(context.TODO(), updatedCluster, client.MergeFrom(originalCluster)) + if err != nil { + return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + } + } else if originalCluster.Status.Phase == v1alpha1.Completed { + //update request, check if promotepolicy nodes increase or decrease. + assigned, err := c.assignWorkNodes(updatedCluster) + if err != nil { + return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s", updatedCluster.Name) + } + if assigned { // indicate nodes change request + updatedCluster.Status.Phase = v1alpha1.Updating + err = c.Client.Patch(context.TODO(), updatedCluster, client.MergeFrom(originalCluster)) + if err != nil { + return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + } + } } return reconcile.Result{}, nil } @@ -70,15 +126,225 @@ func (c *VirtualClusterInitController) SetupWithManager(mgr manager.Manager) err CreateFunc: func(createEvent event.CreateEvent) bool { return true }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { return true }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return true }, })). Complete(c) } +// syncVirtualCluster assign work nodes, create control plane and create compoennts from manifests func (c *VirtualClusterInitController) syncVirtualCluster(virtualCluster *v1alpha1.VirtualCluster) error { klog.V(2).Infof("Reconciling virtual cluster", "name", virtualCluster.Name) + executer, err := NewExecutor(virtualCluster, c.Client, c.Config, c.HostPortManager) if err != nil { return err } - return executer.Execute() + _, err = c.assignWorkNodes(virtualCluster) + if err != nil { + return errors.Wrap(err, "Error in assign work nodes") + } + klog.V(2).Infof("Successfully assigned work node for virtual cluster %s", virtualCluster.Name) + err = executer.Execute() + if err != nil { + return err + } + + secret, err := c.RootClientSet.CoreV1().Secrets(virtualCluster.GetNamespace()).Get(context.TODO(), + fmt.Sprintf("%s-%s", virtualCluster.GetName(), constants.AdminConfig), metav1.GetOptions{}) + if err != nil { + return err + } + virtualCluster.Spec.Kubeconfig = base64.StdEncoding.EncodeToString(secret.Data[constants.KubeConfig]) + virtualCluster.Status.Phase = v1alpha1.Completed + return nil +} + +// assignWorkNodes assign nodes for virtualcluster when creating or updating. return true if successfully assigned +func (c *VirtualClusterInitController) assignWorkNodes(virtualCluster *v1alpha1.VirtualCluster) (bool, error) { + promotepolicies := virtualCluster.Spec.PromotePolicies + if len(promotepolicies) == 0 { + return false, errors.New("PromotePolicies parameter undefined") + } + lock.Lock() + defer lock.Unlock() + nodePool, err := c.getNodePool() + klog.V(2).Infof("Get node pool %v", nodePool) + if err != nil { + return false, errors.Wrap(err, "Get node pool error.") + } + klog.V(2).Infof("Total %d nodes in pool", len(nodePool)) + assigned := false + for _, policy := range promotepolicies { + assignedByPolicy, nodeInfos, err := c.assignNodesByPolicy(virtualCluster, policy, nodePool) + if err != nil { + return false, errors.Wrap(err, "Reassign nodes error") + } + if !assignedByPolicy { + continue + } else { + assigned = true + virtualCluster.Spec.PromoteResources.NodeInfos = nodeInfos + } + } + if assigned { + err := c.updateNodePool(nodePool) + if err != nil { + return false, errors.Wrap(err, "Update node pool error.") + } + } + return assigned, nil +} + +// getNodePool get node pool configmap +func (c *VirtualClusterInitController) getNodePool() (map[string]NodePool, error) { + nodesPoolCm, err := c.RootClientSet.CoreV1().ConfigMaps(constants.KosmosNs).Get(context.TODO(), constants.NodePoolConfigmap, metav1.GetOptions{}) + if err != nil { + return nil, err + } + var nodesPool map[string]NodePool + data, ok := nodesPoolCm.Data["nodes"] + if !ok { + return nil, errors.New("Error parse nodes pool data") + } + err = yaml.Unmarshal([]byte(data), &nodesPool) + if err != nil { + return nil, errors.Wrap(err, "Unmarshal nodes pool data error") + } + return nodesPool, nil +} + +// updateNodePool update node pool configmap +func (c *VirtualClusterInitController) updateNodePool(nodePool map[string]NodePool) error { + klog.V(2).Infof("Update node pool %v", nodePool) + nodePoolYAML, err := yaml.Marshal(nodePool) + if err != nil { + return errors.Wrap(err, "Serialized node pool data error") + } + + originalCm, err := c.RootClientSet.CoreV1().ConfigMaps(constants.KosmosNs).Get(context.TODO(), constants.NodePoolConfigmap, metav1.GetOptions{}) + if err != nil { + return err + } + originalCm.Data = map[string]string{ + "nodes": string(nodePoolYAML), + } + + _, err = c.RootClientSet.CoreV1().ConfigMaps(constants.KosmosNs).Update(context.TODO(), originalCm, metav1.UpdateOptions{}) + if err != nil { + return errors.Wrap(err, "Update node pool configmap data error") + } + klog.V(2).Info("Update node pool Success") + return nil +} + +// nodesChangeCalculate calculate nodes changed when update virtualcluster. +func (c *VirtualClusterInitController) assignNodesByPolicy(virtualCluster *v1alpha1.VirtualCluster, policy v1alpha1.PromotePolicy, nodesPool map[string]NodePool) (bool, []v1alpha1.NodeInfo, error) { + var matched int32 = 0 + var nodesAssignedMatchedPolicy []v1alpha1.NodeInfo + var nodesAssignedUnMatched []v1alpha1.NodeInfo + nodesAssigned := virtualCluster.Spec.PromoteResources.NodeInfos + for _, nodeInfo := range nodesAssigned { + node, ok := nodesPool[nodeInfo.NodeName] + if !ok { + return false, nodesAssigned, errors.Errorf("Node %s doesn't find in nodes pool", nodeInfo.NodeName) + } + if mapContains(node.Labels, policy.LabelSelector.MatchLabels) { + nodesAssignedMatchedPolicy = append(nodesAssignedMatchedPolicy, nodeInfo) + matched++ + } else { + nodesAssignedUnMatched = append(nodesAssignedUnMatched, nodeInfo) + } + } + requestNodesChanged := *policy.NodeCount - matched + if requestNodesChanged == 0 { + klog.V(2).Infof("Nothing to do for policy %s", policy.LabelSelector.String()) + return false, nodesAssigned, nil + } else if requestNodesChanged > 0 { // nodes needs to be increased + klog.V(2).Infof("Try allocate %d nodes for policy %s", requestNodesChanged, policy.LabelSelector.String()) + var cnt int32 = 0 + for name, nodeInfo := range nodesPool { + if nodeInfo.State == constants.NodeFreeState && mapContains(nodeInfo.Labels, policy.LabelSelector.MatchLabels) { + nodeInfo.State = constants.NodeVirtualclusterState + nodeInfo.Cluster = virtualCluster.Name + nodesPool[name] = nodeInfo + nodesAssigned = append(nodesAssigned, v1alpha1.NodeInfo{ + NodeName: name, + }) + cnt++ + } + if cnt == requestNodesChanged { + break + } + } + if cnt < requestNodesChanged { + return false, nodesAssigned, errors.Errorf("There is not enough work nodes for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), requestNodesChanged, matched) + } + } else { // nodes needs to decrease + klog.V(2).Infof("Try decrease nodes %d for policy %s", -requestNodesChanged, policy.LabelSelector.String()) + decrease := int(-requestNodesChanged) + if len(nodesAssignedMatchedPolicy) < decrease { + return false, nodesAssigned, errors.Errorf("Illegal work nodes decrease operation for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), decrease, len(nodesAssignedMatchedPolicy)) + } + nodesAssigned = append(nodesAssignedUnMatched, nodesAssignedMatchedPolicy[:(len(nodesAssignedMatchedPolicy)-decrease)]...) + // note: node pool will not be modified here. NodeController will modify it when node delete success + } + return true, nodesAssigned, nil +} + +func (c *VirtualClusterInitController) ensureAllPodsRunning(virtualCluster *v1alpha1.VirtualCluster, timeout time.Duration) error { + secret, err := c.RootClientSet.CoreV1().Secrets(virtualCluster.GetNamespace()).Get(context.TODO(), + fmt.Sprintf("%s-%s", virtualCluster.GetName(), constants.AdminConfig), metav1.GetOptions{}) + if err != nil { + return errors.Wrap(err, "Get virtualcluster kubeconfig secret error") + } + config, err := clientcmd.RESTConfigFromKubeConfig(secret.Data[constants.KubeConfig]) + if err != nil { + return err + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return err + } + + namespaceList, err := clientset.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return errors.Wrap(err, "List namespaces error") + } + endTime := time.Now().Second() + int(timeout.Seconds()) + for _, namespace := range namespaceList.Items { + startTime := time.Now().Second() + if startTime > endTime { + return errors.New("Timeout waiting for all pods running") + } + err := wait.PollWithContext(context.TODO(), 5*time.Second, time.Duration(endTime-startTime)*time.Second, func(ctx context.Context) (done bool, err error) { + podList, err := clientset.CoreV1().Pods(namespace.Name).List(ctx, metav1.ListOptions{}) + if apierrors.IsNotFound(err) { + return true, nil + } + if err != nil { + return true, errors.Wrap(err, "List pods error") + } + + for _, pod := range podList.Items { + if pod.Status.Phase != v1.PodRunning { + return false, nil + } + } + return true, nil + }) + if err != nil { + return err + } + } + return nil +} + +func mapContains(big map[string]string, small map[string]string) bool { + for k, v := range small { + if bigV, ok := big[k]; !ok || bigV != v { + return false + } + } + return true } diff --git a/pkg/kubenest/init.go b/pkg/kubenest/init.go index 54f9a49ca..c6c79845c 100644 --- a/pkg/kubenest/init.go +++ b/pkg/kubenest/init.go @@ -56,6 +56,7 @@ func NewInitPhase(opts *InitOptions, hostPortManager *vcnodecontroller.HostPortM initPhase.AppendTask(tasks.NewCheckApiserverHealthTask()) initPhase.AppendTask(tasks.NewComponentTask()) initPhase.AppendTask(tasks.NewCheckControlPlaneTask()) + initPhase.AppendTask(tasks.NewComponentsFromManifestsTask()) initPhase.SetDataInitializer(func() (workflow.RunData, error) { return newRunData(opts, hostPortManager) diff --git a/pkg/kubenest/tasks/data.go b/pkg/kubenest/tasks/data.go index 4ca405c2b..810865a49 100644 --- a/pkg/kubenest/tasks/data.go +++ b/pkg/kubenest/tasks/data.go @@ -1,11 +1,11 @@ package tasks import ( - "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" - "github.com/kosmos.io/kosmos/pkg/kubenest/util/cert" clientset "k8s.io/client-go/kubernetes" + "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller" + "github.com/kosmos.io/kosmos/pkg/kubenest/util/cert" ) type InitData interface { diff --git a/pkg/kubenest/tasks/manifests_components.go b/pkg/kubenest/tasks/manifests_components.go new file mode 100644 index 000000000..73dd158cf --- /dev/null +++ b/pkg/kubenest/tasks/manifests_components.go @@ -0,0 +1,187 @@ +package tasks + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/dynamic" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" + "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" +) + +type ComponentConfig struct { + Name string `json:"name" yaml:"name"` + Path string `json:"path" yaml:"path"` +} + +func NewComponentsFromManifestsTask() workflow.Task { + return workflow.Task{ + Name: "manifests-components", + Run: runComponentsFromManifests, + RunSubTasks: true, + Tasks: []workflow.Task{ + { + Name: "deploy-manifests-components", + Run: applyComponentsManifests, + }, + }, + } +} + +func runComponentsFromManifests(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("manifests-components task invoked with an invalid data struct") + } + + klog.V(4).InfoS("[apiserver] Running manifests-components task", "virtual cluster", klog.KObj(data)) + return nil +} + +func applyComponentsManifests(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("Virtual cluster manifests-components task invoked with an invalid data struct") + } + + secret, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(), + fmt.Sprintf("%s-%s", data.GetName(), constants.AdminConfig), metav1.GetOptions{}) + if err != nil { + return errors.Wrap(err, "Get virtualcluster kubeconfig secret error") + } + config, err := clientcmd.RESTConfigFromKubeConfig(secret.Data[constants.KubeConfig]) + if err != nil { + return err + } + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + return err + } + + components, err := getComponentsConfig(data.RemoteClient()) + if err != nil { + return err + } + + for _, component := range components { + klog.V(2).Infof("Deploy component %s", component.Name) + + templatedMapping := make(map[string]interface{}, 2) + if component.Name == constants.VirtualClusterKubeProxyComponent { + templatedMapping["KUBE_PROXY_KUBECONFIG"] = string(secret.Data[constants.KubeConfig]) + } + err = applyTemplatedManifests(dynamicClient, component.Path, templatedMapping) + if err != nil { + return err + } + } + + klog.V(2).InfoS("[manifests-components] Successfully installed virtual cluster manifests-components", "virtual cluster", klog.KObj(data)) + return nil +} + +func getComponentsConfig(client clientset.Interface) ([]ComponentConfig, error) { + cm, err := client.CoreV1().ConfigMaps(constants.KosmosNs).Get(context.Background(), constants.ManifestComponentsConfigmap, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } + return nil, err + } + + yamlData, ok := cm.Data["components"] + if !ok { + return nil, errors.Wrap(err, "Read manifests components config error") + } + + var components []ComponentConfig + err = yaml.Unmarshal([]byte(yamlData), &components) + if err != nil { + return nil, errors.Wrap(err, "Unmarshal manifests component config error") + } + return components, nil +} + +func applyTemplatedManifests(dynamicClient dynamic.Interface, manifestGlob string, templateMapping map[string]interface{}) error { + manifests, err := filepath.Glob(manifestGlob) + klog.V(2).Infof("Component Manifests %s", manifestGlob) + if err != nil { + return err + } + if manifests == nil { + return errors.Errorf("No matching file for pattern %v", manifestGlob) + } + for _, manifest := range manifests { + klog.V(2).Infof("Applying %s", manifest) + var obj unstructured.Unstructured + bytesData, err := os.ReadFile(manifest) + if err != nil { + return errors.Wrapf(err, "Read file %s error", manifest) + } + err = yaml.Unmarshal(bytesData, &obj) + if err != nil { + return errors.Wrapf(err, "Unmarshal manifest bytes data error") + } + gvk := obj.GroupVersionKind() + gvr, _ := meta.UnsafeGuessKindToResource(gvk) + if obj.GetName() == constants.KubeProxyConfigmap && gvr.Resource == "configmaps" { + cm := &corev1.ConfigMap{} + err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, cm) + if err != nil { + return errors.Wrapf(err, "Convert unstructured obj to configmap %s error", obj.GetName()) + } + cm.Data["kubeconfig.conf"] = templateMapping["KUBE_PROXY_KUBECONFIG"].(string) + res, err := runtime.DefaultUnstructuredConverter.ToUnstructured(cm) + if err != nil { + return errors.Wrapf(err, "Convert configmap %s to unstructured obj error", obj.GetName()) + } + obj = unstructured.Unstructured{Object: res} + } else { + templatedBytes, err := util.ParseTemplate(string(bytesData), templateMapping) + if err != nil { + return errors.Wrapf(err, "Parse template data %s", string(bytesData)) + } + err = yaml.Unmarshal(templatedBytes, &obj) + if err != nil { + return errors.Wrapf(err, "Unmarshal templatedBytes error") + } + } + + err = createObject(dynamicClient, obj.GetNamespace(), obj.GetName(), &obj) + if err != nil { + return errors.Wrapf(err, "Create object error") + } + } + return nil +} + +func createObject(dynamicClient dynamic.Interface, namespace string, name string, obj *unstructured.Unstructured) error { + gvk := obj.GroupVersionKind() + gvr, _ := meta.UnsafeGuessKindToResource(gvk) + klog.V(2).Infof("Create %s, name: %s, namespace: %s", gvr.String(), name, namespace) + _, err := dynamicClient.Resource(gvr).Namespace(namespace).Create(context.TODO(), obj, metav1.CreateOptions{}) + if err != nil { + if apierrors.IsAlreadyExists(err) { + klog.Warningf("%s %s already exists", gvr.String(), name) + return nil + } else { + return err + } + } + return nil +}