From 8e15c0829eb7e6d0ed9243a454b72072885eb422 Mon Sep 17 00:00:00 2001 From: yuleichun2019 Date: Mon, 15 Apr 2024 14:28:58 +0800 Subject: [PATCH] add codes for the creating of virtual cluster for tenants Signed-off-by: yuleichun2019 --- Makefile | 3 + cmd/{ => clustertree}/scheduler/OWNERS | 0 cmd/{ => clustertree}/scheduler/main.go | 0 .../operator}/OWNERS | 0 .../operator/app/operator.go} | 25 +- .../operator}/app/options/options.go | 4 +- .../operator}/app/options/validation.go | 0 .../operator}/main.go | 2 +- deploy/crds/kosmos.io_virtualclusters.yaml | 24 +- deploy/virtual-cluster-operator.yml | 77 +++ examples/vitual-cluster-demo.yaml | 14 +- hack/util.sh | 1 + .../kosmos/v1alpha1/virtualcluster_types.go | 13 +- .../kosmos/v1alpha1/zz_generated.deepcopy.go | 6 +- pkg/kubenest/constants/constant.go | 80 ++++ .../kosmos/kosmos_join_controller.go} | 80 ++-- .../virtualcluster_execute_controller.go} | 43 +- .../virtualcluster_init_controller.go | 73 +-- pkg/kubenest/controlplane/apiserver.go | 63 +++ pkg/kubenest/controlplane/component.go | 191 ++++++++ pkg/kubenest/controlplane/etcd.go | 102 ++++ pkg/kubenest/controlplane/rbac.go | 144 ++++++ pkg/kubenest/controlplane/service.go | 119 +++++ pkg/kubenest/init.go | 194 ++++++++ .../apiserver/mainfests_deployment.go | 113 +++++ .../apiserver/mainfests_service.go | 23 + .../controlplane/etcd/mainfests_deployment.go | 97 ++++ .../controlplane/etcd/mainfests_service.go | 50 ++ .../kube-controller/manifests_deployment.go | 84 ++++ .../controlplane/rbac/manifests_rbac.go | 173 +++++++ .../scheduler/manifest_configmap.go | 53 +++ .../scheduler/manifest_deployment.go | 62 +++ .../manifest/kosmos/manifest_deployment.go} | 3 +- .../manifest/kosmos/manifest_secret.go} | 2 +- pkg/kubenest/tasks/apiserver.go | 82 ++++ pkg/kubenest/tasks/cert.go | 144 ++++++ pkg/kubenest/tasks/check.go | 87 ++++ pkg/kubenest/tasks/component.go | 63 +++ pkg/kubenest/tasks/data.go | 20 + pkg/kubenest/tasks/etcd.go | 77 +++ pkg/kubenest/tasks/service.go | 54 +++ pkg/kubenest/tasks/upload.go | 240 ++++++++++ pkg/kubenest/util/address.go | 84 ++++ pkg/kubenest/util/api-client/check.go | 94 ++++ pkg/kubenest/util/cert/certs.go | 450 ++++++++++++++++++ pkg/kubenest/util/cert/store.go | 93 ++++ pkg/kubenest/util/helper.go | 28 ++ pkg/kubenest/util/image.go | 20 + pkg/kubenest/util/kubeconfig.go | 38 ++ pkg/kubenest/util/template.go | 21 + pkg/kubenest/workflow/phase.go | 99 ++++ pkg/treeoperator/constants/constant.go | 11 - pkg/treeoperator/workflow/phase.go | 25 - pkg/utils/constants.go | 1 + 54 files changed, 3498 insertions(+), 151 deletions(-) rename cmd/{ => clustertree}/scheduler/OWNERS (100%) rename cmd/{ => clustertree}/scheduler/main.go (100%) rename cmd/{clustertree-operator => kubenest/operator}/OWNERS (100%) rename cmd/{clustertree-operator/app/tree_operator.go => kubenest/operator/app/operator.go} (76%) rename cmd/{clustertree-operator => kubenest/operator}/app/options/options.go (89%) rename cmd/{clustertree-operator => kubenest/operator}/app/options/validation.go (100%) rename cmd/{clustertree-operator => kubenest/operator}/main.go (79%) create mode 100644 deploy/virtual-cluster-operator.yml create mode 100644 pkg/kubenest/constants/constant.go rename pkg/{treeoperator/controller/virtualcluster_join_controller.go => kubenest/controller/kosmos/kosmos_join_controller.go} (85%) rename pkg/{treeoperator/workflow/execute.go => kubenest/controller/virtualcluster_execute_controller.go} (55%) rename pkg/{treeoperator => kubenest}/controller/virtualcluster_init_controller.go (56%) create mode 100644 pkg/kubenest/controlplane/apiserver.go create mode 100644 pkg/kubenest/controlplane/component.go create mode 100644 pkg/kubenest/controlplane/etcd.go create mode 100644 pkg/kubenest/controlplane/rbac.go create mode 100644 pkg/kubenest/controlplane/service.go create mode 100644 pkg/kubenest/init.go create mode 100644 pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go create mode 100644 pkg/kubenest/manifest/controlplane/apiserver/mainfests_service.go create mode 100644 pkg/kubenest/manifest/controlplane/etcd/mainfests_deployment.go create mode 100644 pkg/kubenest/manifest/controlplane/etcd/mainfests_service.go create mode 100644 pkg/kubenest/manifest/controlplane/kube-controller/manifests_deployment.go create mode 100644 pkg/kubenest/manifest/controlplane/rbac/manifests_rbac.go create mode 100644 pkg/kubenest/manifest/controlplane/scheduler/manifest_configmap.go create mode 100644 pkg/kubenest/manifest/controlplane/scheduler/manifest_deployment.go rename pkg/{treeoperator/manifest/manifest_deployments.go => kubenest/manifest/kosmos/manifest_deployment.go} (94%) rename pkg/{treeoperator/manifest/manifest_secrets.go => kubenest/manifest/kosmos/manifest_secret.go} (95%) create mode 100644 pkg/kubenest/tasks/apiserver.go create mode 100644 pkg/kubenest/tasks/cert.go create mode 100644 pkg/kubenest/tasks/check.go create mode 100644 pkg/kubenest/tasks/component.go create mode 100644 pkg/kubenest/tasks/data.go create mode 100644 pkg/kubenest/tasks/etcd.go create mode 100644 pkg/kubenest/tasks/service.go create mode 100644 pkg/kubenest/tasks/upload.go create mode 100644 pkg/kubenest/util/address.go create mode 100644 pkg/kubenest/util/api-client/check.go create mode 100644 pkg/kubenest/util/cert/certs.go create mode 100644 pkg/kubenest/util/cert/store.go create mode 100644 pkg/kubenest/util/helper.go create mode 100644 pkg/kubenest/util/image.go create mode 100644 pkg/kubenest/util/kubeconfig.go create mode 100644 pkg/kubenest/util/template.go create mode 100644 pkg/kubenest/workflow/phase.go delete mode 100644 pkg/treeoperator/constants/constant.go delete mode 100644 pkg/treeoperator/workflow/phase.go diff --git a/Makefile b/Makefile index 201f70a55..2fcb7a729 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,7 @@ MACOS_TARGETS := clusterlink-controller-manager \ clusterlink-network-manager \ clusterlink-proxy \ clustertree-cluster-manager \ + virtual-cluster-operator \ scheduler \ # clusterlink-agent and clusterlink-floater only support linux platform @@ -26,6 +27,7 @@ TARGETS := clusterlink-controller-manager \ clusterlink-network-manager \ clusterlink-proxy \ clustertree-cluster-manager \ + virtual-cluster-operator \ scheduler \ # If GOOS is macOS, assign the value of MACOS_TARGETS to TARGETS @@ -123,6 +125,7 @@ upload-images: images docker push ${REGISTRY}/clusterlink-floater:${VERSION} docker push ${REGISTRY}/clusterlink-elector:${VERSION} docker push ${REGISTRY}/clustertree-cluster-manager:${VERSION} + docker push ${REGISTRY}/virtual-cluster-operator:${VERSION} docker push ${REGISTRY}/scheduler:${VERSION} .PHONY: release diff --git a/cmd/scheduler/OWNERS b/cmd/clustertree/scheduler/OWNERS similarity index 100% rename from cmd/scheduler/OWNERS rename to cmd/clustertree/scheduler/OWNERS diff --git a/cmd/scheduler/main.go b/cmd/clustertree/scheduler/main.go similarity index 100% rename from cmd/scheduler/main.go rename to cmd/clustertree/scheduler/main.go diff --git a/cmd/clustertree-operator/OWNERS b/cmd/kubenest/operator/OWNERS similarity index 100% rename from cmd/clustertree-operator/OWNERS rename to cmd/kubenest/operator/OWNERS diff --git a/cmd/clustertree-operator/app/tree_operator.go b/cmd/kubenest/operator/app/operator.go similarity index 76% rename from cmd/clustertree-operator/app/tree_operator.go rename to cmd/kubenest/operator/app/operator.go index 4311ceec7..96ca62348 100644 --- a/cmd/clustertree-operator/app/tree_operator.go +++ b/cmd/kubenest/operator/app/operator.go @@ -11,11 +11,12 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" - "github.com/kosmos.io/kosmos/cmd/clustertree-operator/app/options" + "github.com/kosmos.io/kosmos/cmd/kubenest/operator/app/options" + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/controller" + kosmos "github.com/kosmos.io/kosmos/pkg/kubenest/controller/kosmos" "github.com/kosmos.io/kosmos/pkg/scheme" "github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag" - "github.com/kosmos.io/kosmos/pkg/treeoperator/constants" - "github.com/kosmos.io/kosmos/pkg/treeoperator/controller" ) func NewVirtualClusterOperatorCommand(ctx context.Context) *cobra.Command { @@ -82,14 +83,16 @@ func run(ctx context.Context, opts *options.Options) error { return fmt.Errorf("error starting %s: %v", constants.InitControllerName, err) } - VirtualClusterJoinController := controller.VirtualClusterJoinController{ - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(constants.JoinControllerName), - KubeconfigPath: opts.KubernetesOptions.KubeConfig, - AllowNodeOwnbyMulticluster: opts.AllowNodeOwnbyMulticluster, - } - if err = VirtualClusterJoinController.SetupWithManager(mgr); err != nil { - return fmt.Errorf("error starting %s: %v", constants.JoinControllerName, err) + if opts.KosmosJoinController { + KosmosJoinController := kosmos.KosmosJoinController{ + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor(constants.KosmosJoinControllerName), + KubeconfigPath: opts.KubernetesOptions.KubeConfig, + AllowNodeOwnbyMulticluster: opts.AllowNodeOwnbyMulticluster, + } + if err = KosmosJoinController.SetupWithManager(mgr); err != nil { + return fmt.Errorf("error starting %s: %v", constants.KosmosJoinControllerName, err) + } } if err := mgr.Start(ctx); err != nil { diff --git a/cmd/clustertree-operator/app/options/options.go b/cmd/kubenest/operator/app/options/options.go similarity index 89% rename from cmd/clustertree-operator/app/options/options.go rename to cmd/kubenest/operator/app/options/options.go index e54c20300..d37ffc13e 100644 --- a/cmd/clustertree-operator/app/options/options.go +++ b/cmd/kubenest/operator/app/options/options.go @@ -12,6 +12,7 @@ type Options struct { LeaderElection componentbaseconfig.LeaderElectionConfiguration KubernetesOptions KubernetesOptions AllowNodeOwnbyMulticluster bool + KosmosJoinController bool } type KubernetesOptions struct { @@ -38,11 +39,12 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { } flags.BoolVar(&o.LeaderElection.LeaderElect, "leader-elect", true, "Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability.") - flags.StringVar(&o.LeaderElection.ResourceName, "leader-elect-resource-name", "clustertree-operator", "The name of resource object that is used for locking during leader election.") + flags.StringVar(&o.LeaderElection.ResourceName, "leader-elect-resource-name", "operator", "The name of resource object that is used for locking during leader election.") flags.StringVar(&o.LeaderElection.ResourceNamespace, "leader-elect-resource-namespace", utils.DefaultNamespace, "The namespace of resource object that is used for locking during leader election.") flags.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", 40.0, "QPS to use while talking with kube-apiserver.") flags.IntVar(&o.KubernetesOptions.Burst, "kube-burst", 60, "Burst to use while talking with kube-apiserver.") flags.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path for kubernetes kubeconfig file, if left blank, will use in cluster way.") flags.StringVar(&o.KubernetesOptions.Master, "master", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.") flags.BoolVar(&o.AllowNodeOwnbyMulticluster, "multiowner", false, "Allow node own by multicluster or not.") + flags.BoolVar(&o.KosmosJoinController, "kosmos-join-controller", false, "Turn on or off kosmos-join-controller.") } diff --git a/cmd/clustertree-operator/app/options/validation.go b/cmd/kubenest/operator/app/options/validation.go similarity index 100% rename from cmd/clustertree-operator/app/options/validation.go rename to cmd/kubenest/operator/app/options/validation.go diff --git a/cmd/clustertree-operator/main.go b/cmd/kubenest/operator/main.go similarity index 79% rename from cmd/clustertree-operator/main.go rename to cmd/kubenest/operator/main.go index 9f356cc6b..090b575d0 100644 --- a/cmd/clustertree-operator/main.go +++ b/cmd/kubenest/operator/main.go @@ -6,7 +6,7 @@ import ( "k8s.io/component-base/cli" ctrl "sigs.k8s.io/controller-runtime" - "github.com/kosmos.io/kosmos/cmd/clustertree-operator/app" + "github.com/kosmos.io/kosmos/cmd/kubenest/operator/app" ) func main() { diff --git a/deploy/crds/kosmos.io_virtualclusters.yaml b/deploy/crds/kosmos.io_virtualclusters.yaml index c38cffe04..c3ffef5ef 100644 --- a/deploy/crds/kosmos.io_virtualclusters.yaml +++ b/deploy/crds/kosmos.io_virtualclusters.yaml @@ -1,9 +1,10 @@ + --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.0 + controller-gen.kubebuilder.io/version: v0.7.0 creationTimestamp: null name: virtualclusters.kosmos.io spec: @@ -43,11 +44,18 @@ spec: to the kubernetes's control plane, the resources can be nodes or just cpu,memory or gpu resources properties: - nodes: - description: Nodes is the names of node to promote to the kubernetes's - control plane + nodeInfos: + description: NodeInfos is the info of nodes to promote to the + kubernetes's control plane items: - type: string + properties: + address: + description: Address defines node ip + type: string + nodeName: + description: NodeName defines node name + type: string + type: object type: array resources: additionalProperties: @@ -75,3 +83,9 @@ spec: type: object served: true storage: true +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] diff --git a/deploy/virtual-cluster-operator.yml b/deploy/virtual-cluster-operator.yml new file mode 100644 index 000000000..6b2382b0b --- /dev/null +++ b/deploy/virtual-cluster-operator.yml @@ -0,0 +1,77 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: virtual-cluster-operator + namespace: kosmos-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: virtual-cluster-operator +rules: + - apiGroups: ['*'] + resources: ['*'] + verbs: ["*"] + - nonResourceURLs: ['*'] + verbs: ["get"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: virtual-cluster-operator +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: virtual-cluster-operator +subjects: + - kind: ServiceAccount + name: virtual-cluster-operator + namespace: kosmos-system +--- +apiVersion: v1 +kind: Secret +metadata: + name: virtual-cluster-operator + namespace: kosmos-system +type: Opaque +data: + kubeconfig: __kubeconfig__ + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: virtual-cluster-operator + namespace: kosmos-system + labels: + app: virtual-cluster-operator +spec: + replicas: 1 + selector: + matchLabels: + app: virtual-cluster-operator + template: + metadata: + labels: + app: virtual-cluster-operator + spec: + serviceAccountName: virtual-cluster-operator + containers: + - name: virtual-cluster-operator + image: repo1-cn-beijing.cr.volces.com/virtualcluster/virtualcluter/virtual-cluster-operator:ee1bcd33-dirty + imagePullPolicy: IfNotPresent + env: + - name: IMAGE_REPOSITIRY + value: repo1-cn-beijing.cr.volces.com/virtualcluster/virtualcluter + volumeMounts: + - name: credentials + mountPath: /etc/virtual-cluster-operator + readOnly: true + command: + - virtual-cluster-operator + - --kubeconfig=/etc/virtual-cluster-operator/kubeconfig + - --v=4 + volumes: + - name: credentials + secret: + secretName: virtual-cluster-operator diff --git a/examples/vitual-cluster-demo.yaml b/examples/vitual-cluster-demo.yaml index 60f1092d9..d6800234f 100644 --- a/examples/vitual-cluster-demo.yaml +++ b/examples/vitual-cluster-demo.yaml @@ -5,9 +5,13 @@ metadata: name: test spec: promoteResources: - nodes: - - node1 - - node2 + nodeInfos: + - nodeName: nodeName1 + address: 127.0.0.1 + - nodeName: nodeName2 + address: 127.0.0.2 resources: - cpu: 1 - memory: "1Gi" \ No newline at end of file + cpu: 2 + memory: "1Gi" +status: + phase: Completed diff --git a/hack/util.sh b/hack/util.sh index b2167f52c..04c30c694 100755 --- a/hack/util.sh +++ b/hack/util.sh @@ -22,6 +22,7 @@ CLUSTERLINK_TARGET_SOURCE=( clusterlink-network-manager=cmd/clusterlink/network-manager clusterlink-controller-manager=cmd/clusterlink/controller-manager clustertree-cluster-manager=cmd/clustertree/cluster-manager + virtual-cluster-operator=cmd/kubenest/operator kosmosctl=cmd/kosmosctl ) diff --git a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go index 08f1288ce..eca1f19b5 100644 --- a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go +++ b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go @@ -45,15 +45,24 @@ type VirtualClusterSpec struct { } type PromoteResources struct { - // Nodes is the names of node to promote to the kubernetes's control plane + // NodeInfos is the info of nodes to promote to the kubernetes's control plane // +optional - Nodes []string `json:"nodes,omitempty"` + NodeInfos []NodeInfo `json:"nodeInfos,omitempty"` // Resources is the resources to promote to the kubernetes's control plane // +optional Resources corev1.ResourceList `json:"resources,omitempty"` } +type NodeInfo struct { + //NodeName defines node name + //+optional + NodeName string `json:"nodeName,omitempty"` + //Address defines node ip + //+optional + Address string `json:"address,omitempty"` +} + type VirtualClusterStatus struct { // Phase is the phase of kosmos-operator handling the VirtualCluster // +optional diff --git a/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go index 79942057a..c36ad4681 100644 --- a/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go @@ -1306,9 +1306,9 @@ func (in *PolicyTerm) DeepCopy() *PolicyTerm { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PromoteResources) DeepCopyInto(out *PromoteResources) { *out = *in - if in.Nodes != nil { - in, out := &in.Nodes, &out.Nodes - *out = make([]string, len(*in)) + if in.NodeInfos != nil { + in, out := &in.NodeInfos, &out.NodeInfos + *out = make([]NodeInfo, len(*in)) copy(*out, *in) } if in.Resources != nil { diff --git a/pkg/kubenest/constants/constant.go b/pkg/kubenest/constants/constant.go new file mode 100644 index 000000000..44a47c464 --- /dev/null +++ b/pkg/kubenest/constants/constant.go @@ -0,0 +1,80 @@ +package constants + +import "time" + +const ( + InitControllerName = "virtual-cluster-init-controller" + KosmosJoinControllerName = "kosmos-join-controller" + SystemNs = "kube-system" + DefauleImageRepositoryEnv = "IMAGE_REPOSITIRY" + DefauleImageVersionEnv = "IMAGE_VERSION" + VirtualClusterStatusCompleted = "Completed" + VirtualClusterFinalizerName = "kosmos.io/virtual-cluster-finalizer" + ServiceType = "NodePort" + EtcdServiceType = "ClusterIP" + DisableCascadingDeletionLabel = "operator.virtualcluster.io/disable-cascading-deletion" + ControllerFinalizerName = "operator.virtualcluster.io/finalizer" + DefaultKubeconfigPath = "/etc/cluster-tree/cert" + Label = "virtualCluster-app" + ComponentBeReadyTimeout = 120 * time.Second + + // CertificateBlockType is a possible value for pem.Block.Type. + CertificateBlockType = "CERTIFICATE" + RsaKeySize = 2048 + KeyExtension = ".key" + CertExtension = ".crt" + CertificateValidity = time.Hour * 24 * 365 + CaCertAndKeyName = "ca" + VirtualClusterCertAndKeyName = "virtualCluster" + VirtualClusterSystemNamespace = "virtualCluster-system" + ApiserverCertAndKeyName = "apiserver" + EtcdCaCertAndKeyName = "etcd-ca" + EtcdServerCertAndKeyName = "etcd-server" + EtcdClientCertAndKeyName = "etcd-client" + FrontProxyCaCertAndKeyName = "front-proxy-ca" + FrontProxyClientCertAndKeyName = "front-proxy-client" + + //controlplane apiserver + ApiServer = "apiserver" + ApiServerReplicas = 1 + ApiServerServiceSubnet = "10.237.6.18/29" + ApiServerEtcdListenClientPort = 2379 + ApiServerServiceType = "NodePort" + // APICallRetryInterval defines how long kubeadm should wait before retrying a failed API operation + ApiServerCallRetryInterval = 100 * time.Millisecond + + //controlplane etcd + Etcd = "etcd" + EtcdReplicas = 3 + EtcdDataVolumeName = "etcd-data" + EtcdListenClientPort = 2379 + EtcdListenPeerPort = 2380 + + //controlplane kube-controller + KubeControllerReplicas = 1 + KubeControllerManagerComponent = "KubeControllerManager" + KubeControllerManager = "kube-controller-manager" + + //controlplane scheduler + VirtualClusterSchedulerReplicas = 1 + VirtualClusterSchedulerComponent = "VirtualClusterScheduler" + VirtualClusterSchedulerComponentConfigMap = "scheduler-config" + VirtualClusterScheduler = "virtualCluster-scheduler" + + //controlplane auth + AdminConfig = "admin-config" + KubeConfig = "kubeconfig" + + //controlplane upload + VirtualClusterLabelKeyName = "app.kubernetes.io/managed-by" + VirtualClusterController = "virtual-cluster-controller" + ClusterName = "virtualCluster-apiserver" + UserName = "virtualCluster-admin" + + // InitAction represents init virtual cluster instance + InitAction Action = "init" + // DeInitAction represents delete virtual cluster instance + DeInitAction Action = "deInit" +) + +type Action string diff --git a/pkg/treeoperator/controller/virtualcluster_join_controller.go b/pkg/kubenest/controller/kosmos/kosmos_join_controller.go similarity index 85% rename from pkg/treeoperator/controller/virtualcluster_join_controller.go rename to pkg/kubenest/controller/kosmos/kosmos_join_controller.go index 590112809..4c08a58a8 100644 --- a/pkg/treeoperator/controller/virtualcluster_join_controller.go +++ b/pkg/kubenest/controller/kosmos/kosmos_join_controller.go @@ -30,15 +30,15 @@ import ( "github.com/kosmos.io/kosmos/pkg/cert" clusterManager "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" - "github.com/kosmos.io/kosmos/pkg/kosmosctl/manifest" + file "github.com/kosmos.io/kosmos/pkg/kosmosctl/manifest" kosmosctl "github.com/kosmos.io/kosmos/pkg/kosmosctl/util" - "github.com/kosmos.io/kosmos/pkg/treeoperator/constants" - treemanifest "github.com/kosmos.io/kosmos/pkg/treeoperator/manifest" + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + manifest "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/kosmos" "github.com/kosmos.io/kosmos/pkg/utils" "github.com/kosmos.io/kosmos/pkg/version" ) -type VirtualClusterJoinController struct { +type KosmosJoinController struct { client.Client EventRecorder record.EventRecorder KubeconfigPath string @@ -50,7 +50,7 @@ var nodeOwnerMap map[string]string = make(map[string]string) var mu sync.Mutex var once sync.Once -func (c *VirtualClusterJoinController) RemoveClusterFinalizer(cluster *v1alpha1.Cluster, kosmosClient versioned.Interface) error { +func (c *KosmosJoinController) RemoveClusterFinalizer(cluster *v1alpha1.Cluster, kosmosClient versioned.Interface) error { for _, finalizer := range []string{utils.ClusterStartControllerFinalizer, clusterManager.ControllerFinalizerName} { if controllerutil.ContainsFinalizer(cluster, finalizer) { controllerutil.RemoveFinalizer(cluster, finalizer) @@ -67,7 +67,7 @@ func (c *VirtualClusterJoinController) RemoveClusterFinalizer(cluster *v1alpha1. return nil } -func (c *VirtualClusterJoinController) InitNodeOwnerMap() { +func (c *KosmosJoinController) InitNodeOwnerMap() { vcList := &v1alpha1.VirtualClusterList{} err := c.List(context.Background(), vcList) if err != nil { @@ -118,14 +118,14 @@ func (c *VirtualClusterJoinController) InitNodeOwnerMap() { klog.Infof("Init nodeOwnerMap is %v", nodeOwnerMap) } -func (c *VirtualClusterJoinController) UninstallClusterTree(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error { +func (c *KosmosJoinController) UninstallClusterTree(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error { klog.Infof("Start deleting kosmos-clustertree deployment %s/%s-clustertree-cluster-manager...", request.Namespace, request.Name) - clustertreeDeploy, err := kosmosctl.GenerateDeployment(treemanifest.ClusterTreeClusterManagerDeployment, treemanifest.DeploymentReplace{ + clustertreeDeploy, err := kosmosctl.GenerateDeployment(manifest.ClusterTreeClusterManagerDeployment, manifest.DeploymentReplace{ Namespace: request.Namespace, ImageRepository: "null", Version: "null", Name: request.Name, - FilePath: treemanifest.DefaultKubeconfigPath, + FilePath: constants.DefaultKubeconfigPath, }) if err != nil { return err @@ -153,7 +153,7 @@ func (c *VirtualClusterJoinController) UninstallClusterTree(ctx context.Context, klog.Infof("Deployment %s/%s-clustertree-cluster-manager has been deleted. ", request.Namespace, request.Name) klog.Infof("Start deleting kosmos-clustertree secret %s/%s-clustertree-cluster-manager", request.Namespace, request.Name) - clustertreeSecret, err := kosmosctl.GenerateSecret(treemanifest.ClusterTreeClusterManagerSecret, treemanifest.SecretReplace{ + clustertreeSecret, err := kosmosctl.GenerateSecret(manifest.ClusterTreeClusterManagerSecret, manifest.SecretReplace{ Namespace: request.Namespace, Cert: cert.GetCrtEncode(), Key: cert.GetKeyEncode(), @@ -220,7 +220,7 @@ func (c *VirtualClusterJoinController) UninstallClusterTree(ctx context.Context, return nil } -func (c *VirtualClusterJoinController) InitTargetKubeclient(kubeconfigStream []byte) (versioned.Interface, kubernetes.Interface, extensionsclient.Interface, error) { +func (c *KosmosJoinController) InitTargetKubeclient(kubeconfigStream []byte) (versioned.Interface, kubernetes.Interface, extensionsclient.Interface, error) { //targetKubeconfig := path.Join(DefaultKubeconfigPath, "kubeconfig") //config, err := utils.RestConfig(targetKubeconfig, "") config, err := utils.NewConfigFromBytes(kubeconfigStream) @@ -246,9 +246,9 @@ func (c *VirtualClusterJoinController) InitTargetKubeclient(kubeconfigStream []b return kosmosClient, k8sClient, k8sExtensionsClient, nil } -func (c *VirtualClusterJoinController) DeployKosmos(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error { +func (c *KosmosJoinController) DeployKosmos(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error { klog.Infof("Start creating kosmos-clustertree secret %s/%s-clustertree-cluster-manager", request.Namespace, request.Name) - clustertreeSecret, err := kosmosctl.GenerateSecret(treemanifest.ClusterTreeClusterManagerSecret, treemanifest.SecretReplace{ + clustertreeSecret, err := kosmosctl.GenerateSecret(manifest.ClusterTreeClusterManagerSecret, manifest.SecretReplace{ Namespace: request.Namespace, Cert: cert.GetCrtEncode(), Key: cert.GetKeyEncode(), @@ -278,11 +278,11 @@ func (c *VirtualClusterJoinController) DeployKosmos(ctx context.Context, request if len(imageVersion) == 0 { imageVersion = fmt.Sprintf("v%s", version.GetReleaseVersion().PatchRelease()) } - clustertreeDeploy, err := kosmosctl.GenerateDeployment(treemanifest.ClusterTreeClusterManagerDeployment, treemanifest.DeploymentReplace{ + clustertreeDeploy, err := kosmosctl.GenerateDeployment(manifest.ClusterTreeClusterManagerDeployment, manifest.DeploymentReplace{ Namespace: request.Namespace, ImageRepository: imageRepository, Version: imageVersion, - FilePath: treemanifest.DefaultKubeconfigPath, + FilePath: constants.DefaultKubeconfigPath, Name: request.Name, }) if err != nil { @@ -299,7 +299,7 @@ func (c *VirtualClusterJoinController) DeployKosmos(ctx context.Context, request return nil } -func (c *VirtualClusterJoinController) ClearSomeNodeOwner(nodeNames *[]string) { +func (c *KosmosJoinController) ClearSomeNodeOwner(nodeNames *[]string) { if !c.AllowNodeOwnbyMulticluster { mu.Lock() for _, nodeName := range *nodeNames { @@ -309,7 +309,7 @@ func (c *VirtualClusterJoinController) ClearSomeNodeOwner(nodeNames *[]string) { } } -func (c *VirtualClusterJoinController) CreateClusterObject(ctx context.Context, request reconcile.Request, +func (c *KosmosJoinController) CreateClusterObject(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster, hostK8sClient kubernetes.Interface, cluster *v1alpha1.Cluster) (*[]string, *map[string]struct{}, error) { var leafModels []v1alpha1.LeafModel // recored new nodes' name, if error happen before create or update, need clear newNodeNames @@ -318,36 +318,36 @@ func (c *VirtualClusterJoinController) CreateClusterObject(ctx context.Context, // compare all nodes in cluster cr to all node exits in virtual cluster,we can find which ndoe should be deleted allNodeNamesMap := map[string]struct{}{} - for _, nodeName := range vc.Spec.PromoteResources.Nodes { - _, err := hostK8sClient.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + for _, nodeInfo := range vc.Spec.PromoteResources.NodeInfos { + _, err := hostK8sClient.CoreV1().Nodes().Get(context.Background(), nodeInfo.NodeName, metav1.GetOptions{}) if err != nil { if apierrors.IsNotFound(err) { - klog.Warningf("node %s doesn't exits: %v", nodeName, err) + klog.Warningf("node %s doesn't exits: %v", nodeInfo.NodeName, err) continue } c.ClearSomeNodeOwner(&newNodeNames) - klog.Errorf("get node %s error: %v", nodeName, err) + klog.Errorf("get node %s error: %v", nodeInfo.NodeName, err) return nil, nil, err } if !c.AllowNodeOwnbyMulticluster { mu.Lock() if len(nodeOwnerMap) > 0 { - if nodeOwner, existed := nodeOwnerMap[nodeName]; existed && len(nodeOwner) > 0 { + if nodeOwner, existed := nodeOwnerMap[nodeInfo.NodeName]; existed && len(nodeOwner) > 0 { if nodeOwner != cluster.Name { continue } } else { - newNodeNames = append(newNodeNames, nodeName) + newNodeNames = append(newNodeNames, nodeInfo.NodeName) } } else { - newNodeNames = append(newNodeNames, nodeName) + newNodeNames = append(newNodeNames, nodeInfo.NodeName) } - allNodeNamesMap[nodeName] = struct{}{} - nodeOwnerMap[nodeName] = cluster.Name + allNodeNamesMap[nodeInfo.NodeName] = struct{}{} + nodeOwnerMap[nodeInfo.NodeName] = cluster.Name mu.Unlock() } leafModel := v1alpha1.LeafModel{ - LeafNodeName: nodeName, + LeafNodeName: nodeInfo.NodeName, Taints: []corev1.Taint{ { Effect: utils.KosmosNodeTaintEffect, @@ -356,7 +356,7 @@ func (c *VirtualClusterJoinController) CreateClusterObject(ctx context.Context, }, }, NodeSelector: v1alpha1.NodeSelector{ - NodeName: nodeName, + NodeName: nodeInfo.NodeName, }, } leafModels = append(leafModels, leafModel) @@ -368,7 +368,7 @@ func (c *VirtualClusterJoinController) CreateClusterObject(ctx context.Context, return &newNodeNames, &allNodeNamesMap, nil } -func (c *VirtualClusterJoinController) CreateOrUpdateCluster(ctx context.Context, request reconcile.Request, +func (c *KosmosJoinController) CreateOrUpdateCluster(ctx context.Context, request reconcile.Request, kosmosClient versioned.Interface, k8sClient kubernetes.Interface, newNodeNames *[]string, allNodeNamesMap *map[string]struct{}, cluster *v1alpha1.Cluster) error { old, err := kosmosClient.KosmosV1alpha1().Clusters().Get(context.TODO(), cluster.Name, metav1.GetOptions{}) @@ -415,7 +415,7 @@ func (c *VirtualClusterJoinController) CreateOrUpdateCluster(ctx context.Context return nil } -func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error { +func (c *KosmosJoinController) CreateCluster(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error { kubeconfigStream, err := base64.StdEncoding.DecodeString(vc.Spec.Kubeconfig) if err != nil { return fmt.Errorf("decode target kubernetes kubeconfig %s err: %v", vc.Spec.Kubeconfig, err) @@ -427,8 +427,8 @@ func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, reques // create crd cluster.kosmos.io klog.Infof("Attempting to create kosmos-clustertree CRDs for virtualcluster %s/%s...", request.Namespace, request.Name) - for _, crdToCreate := range []string{manifest.ServiceImport, manifest.Cluster, - manifest.ServiceExport, manifest.PodConversionCRD, manifest.PodConvertPolicyCRD} { + for _, crdToCreate := range []string{file.ServiceImport, file.Cluster, + file.ServiceExport, file.PodConversionCRD, file.PodConvertPolicyCRD} { crdObject, err := kosmosctl.GenerateCustomResourceDefinition(crdToCreate, nil) if err != nil { return err @@ -486,7 +486,7 @@ func (c *VirtualClusterJoinController) CreateCluster(ctx context.Context, reques return nil } -func (c *VirtualClusterJoinController) AddFinalizer(ctx context.Context, vc *v1alpha1.VirtualCluster) error { +func (c *KosmosJoinController) AddFinalizer(ctx context.Context, vc *v1alpha1.VirtualCluster) error { vcNew := vc.DeepCopy() if controllerutil.AddFinalizer(vcNew, constants.VirtualClusterFinalizerName) { err := c.Update(ctx, vcNew) @@ -498,7 +498,7 @@ func (c *VirtualClusterJoinController) AddFinalizer(ctx context.Context, vc *v1a return nil } -func (c *VirtualClusterJoinController) RemoveFinalizer(ctx context.Context, vc *v1alpha1.VirtualCluster) error { +func (c *KosmosJoinController) RemoveFinalizer(ctx context.Context, vc *v1alpha1.VirtualCluster) error { vcNew := vc.DeepCopy() if controllerutil.ContainsFinalizer(vcNew, constants.VirtualClusterFinalizerName) { controllerutil.RemoveFinalizer(vcNew, constants.VirtualClusterFinalizerName) @@ -511,7 +511,7 @@ func (c *VirtualClusterJoinController) RemoveFinalizer(ctx context.Context, vc * return nil } -func (c *VirtualClusterJoinController) InstallClusterTree(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error { +func (c *KosmosJoinController) InstallClusterTree(ctx context.Context, request reconcile.Request, vc *v1alpha1.VirtualCluster) error { klog.Infof("Start creating kosmos-clustertree in namespace %s", request.Namespace) defer klog.Infof("Finish creating kosmos-clustertree in namespace %s", request.Namespace) if err := c.DeployKosmos(ctx, request, vc); err != nil { @@ -526,9 +526,9 @@ func (c *VirtualClusterJoinController) InstallClusterTree(ctx context.Context, r return nil } -func (c *VirtualClusterJoinController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { - klog.V(4).Infof("============ %s starts to reconcile %s ============", constants.JoinControllerName, request.Name) - defer klog.V(4).Infof("============ %s reconcile finish %s ============", constants.JoinControllerName, request.Name) +func (c *KosmosJoinController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + klog.V(4).Infof("============ %s starts to reconcile %s ============", constants.KosmosJoinControllerName, request.Name) + defer klog.V(4).Infof("============ %s reconcile finish %s ============", constants.KosmosJoinControllerName, request.Name) if !c.AllowNodeOwnbyMulticluster { once.Do(c.InitNodeOwnerMap) } @@ -562,7 +562,7 @@ func (c *VirtualClusterJoinController) Reconcile(ctx context.Context, request re return reconcile.Result{}, nil } -func (c *VirtualClusterJoinController) SetupWithManager(mgr manager.Manager) error { +func (c *KosmosJoinController) SetupWithManager(mgr manager.Manager) error { if c.Client == nil { c.Client = mgr.GetClient() } @@ -578,7 +578,7 @@ func (c *VirtualClusterJoinController) SetupWithManager(mgr manager.Manager) err c.KubeconfigStream = kubeconfigStream return ctrl.NewControllerManagedBy(mgr). - Named(constants.JoinControllerName). + Named(constants.KosmosJoinControllerName). WithOptions(controller.Options{}). For(&v1alpha1.VirtualCluster{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(createEvent event.CreateEvent) bool { diff --git a/pkg/treeoperator/workflow/execute.go b/pkg/kubenest/controller/virtualcluster_execute_controller.go similarity index 55% rename from pkg/treeoperator/workflow/execute.go rename to pkg/kubenest/controller/virtualcluster_execute_controller.go index 0f0e15209..9d4d22e88 100644 --- a/pkg/treeoperator/workflow/execute.go +++ b/pkg/kubenest/controller/virtualcluster_execute_controller.go @@ -1,4 +1,4 @@ -package workflow +package controller import ( "context" @@ -12,18 +12,36 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/kubenest" + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" ) type Executer struct { client.Client virtualCluster *v1alpha1.VirtualCluster - phase *Phase + phase *workflow.Phase config *rest.Config } func NewExecuter(virtualCluster *v1alpha1.VirtualCluster, c client.Client, config *rest.Config) (*Executer, error) { - var phase *Phase - //TODO Initializes data and tasks + var phase *workflow.Phase + + action := recognizeActionFor(virtualCluster) + switch action { + case constants.InitAction: + opts := []kubenest.InitOpt{ + kubenest.NewInitOptWithVirtualCluster(virtualCluster), + kubenest.NewInitOptWithKubeconfig(config), + } + options := kubenest.NewPhaseInitOptions(opts...) + phase = kubenest.NewInitPhase(options) + case constants.DeInitAction: + //TODO deinit + default: + return nil, fmt.Errorf("failed to recognize action for virtual cluster %s", virtualCluster.Name) + } + return &Executer{ virtualCluster: virtualCluster, Client: c, @@ -38,7 +56,7 @@ func (e *Executer) Execute() error { if err := e.phase.Run(); err != nil { klog.ErrorS(err, "failed to executed the workflow", "workflow", "virtual cluster", klog.KObj(e.virtualCluster)) } - + //TODO modify status for virtualcluster if err := e.afterRunPhase(); err != nil { return err } @@ -51,13 +69,22 @@ func (e *Executer) afterRunPhase() error { if err != nil { return fmt.Errorf("error when creating local cluster client, err: %w", err) } - - secret, err := localClusterClient.CoreV1().Secrets(e.virtualCluster.GetNamespace()).Get(context.TODO(), fmt.Sprintf("%s-%s", e.virtualCluster.GetName(), "admin-config"), metav1.GetOptions{}) + secret, err := localClusterClient.CoreV1().Secrets(e.virtualCluster.GetNamespace()).Get(context.TODO(), + fmt.Sprintf("%s-%s", e.virtualCluster.GetName(), constants.AdminConfig), metav1.GetOptions{}) if err != nil { return err } - e.virtualCluster.Spec.Kubeconfig = base64.StdEncoding.EncodeToString(secret.Data["kubeconfig"]) + kubeconfigBytes := secret.Data[constants.KubeConfig] + configString := base64.StdEncoding.EncodeToString(kubeconfigBytes) + e.virtualCluster.Spec.Kubeconfig = configString e.virtualCluster.Status.Phase = v1alpha1.Completed return e.Client.Update(context.TODO(), e.virtualCluster) } + +func recognizeActionFor(virtualCluster *v1alpha1.VirtualCluster) constants.Action { + if !virtualCluster.DeletionTimestamp.IsZero() { + return constants.DeInitAction + } + return constants.InitAction +} diff --git a/pkg/treeoperator/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go similarity index 56% rename from pkg/treeoperator/controller/virtualcluster_init_controller.go rename to pkg/kubenest/controller/virtualcluster_init_controller.go index 775fe9ff6..95168da20 100644 --- a/pkg/treeoperator/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -2,7 +2,7 @@ package controller import ( "context" - "reflect" + "time" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/rest" @@ -18,8 +18,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/treeoperator/constants" - "github.com/kosmos.io/kosmos/pkg/treeoperator/workflow" + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" ) type VirtualClusterInitController struct { @@ -28,52 +27,54 @@ type VirtualClusterInitController struct { EventRecorder record.EventRecorder } -var predicatesFunc = predicate.Funcs{ - CreateFunc: func(createEvent event.CreateEvent) bool { - return true - }, - UpdateFunc: func(updateEvent event.UpdateEvent) bool { - newObj := updateEvent.ObjectNew.(*v1alpha1.VirtualCluster) - oldObj := updateEvent.ObjectOld.(*v1alpha1.VirtualCluster) +func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + startTime := time.Now() + klog.V(4).InfoS("Started syncing virtual cluster", "virtual cluster", request, "startTime", startTime) + defer func() { + klog.V(4).InfoS("Finished syncing virtual cluster", "virtual cluster", request, "duration", time.Since(startTime)) + }() - if !newObj.DeletionTimestamp.IsZero() { - return true + virtualCluster := &v1alpha1.VirtualCluster{} + if err := c.Get(ctx, request.NamespacedName, virtualCluster); err != nil { + if errors.IsNotFound(err) { + klog.V(2).InfoS("Virtual Cluster has been deleted", "Virtual Cluster", request) + return reconcile.Result{}, nil } + return reconcile.Result{}, nil + } - return !reflect.DeepEqual(newObj.Spec, oldObj.Spec) - }, - GenericFunc: func(genericEvent event.GenericEvent) bool { - return false - }, + //TODO The object is being deleted + //TODO The object is being updated + + if virtualCluster.Status.Phase == constants.VirtualClusterStatusCompleted { + klog.Infof("cluster's status is %s, skip", virtualCluster.Status.Phase) + return reconcile.Result{}, nil + } + + err := c.syncVirtualCluster(virtualCluster) + if err != nil { + return reconcile.Result{}, nil + } + return reconcile.Result{}, nil } func (c *VirtualClusterInitController) SetupWithManager(mgr manager.Manager) error { return controllerruntime.NewControllerManagedBy(mgr). Named(constants.InitControllerName). WithOptions(controller.Options{}). - For(&v1alpha1.VirtualCluster{}, builder.WithPredicates(predicatesFunc)). + For(&v1alpha1.VirtualCluster{}, + builder.WithPredicates(predicate.Funcs{ + // UpdateFunc: c.onVirtualClusterUpdate, + CreateFunc: func(createEvent event.CreateEvent) bool { + return true + }, + })). Complete(c) } -func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { - klog.V(4).Infof("============ %s starts to reconcile %s ============", constants.InitControllerName, request.Name) - - virtualCluster := &v1alpha1.VirtualCluster{} - if err := c.Get(ctx, request.NamespacedName, virtualCluster); err != nil { - if errors.IsNotFound(err) { - klog.V(2).InfoS("Virtual Cluster has been deleted", "Virtual Cluster", request) - return controllerruntime.Result{}, nil - } - return controllerruntime.Result{}, err - } - - virtualClusterCopy := virtualCluster.DeepCopy() - return controllerruntime.Result{}, c.syncVirtualCluster(virtualClusterCopy) -} - func (c *VirtualClusterInitController) syncVirtualCluster(virtualCluster *v1alpha1.VirtualCluster) error { - klog.V(2).Infof("Reconciling Virtual Cluster", "name", virtualCluster.Name) - executer, err := workflow.NewExecuter(virtualCluster, c.Client, c.Config) + klog.V(2).Infof("Reconciling virtual cluster", "name", virtualCluster.Name) + executer, err := NewExecuter(virtualCluster, c.Client, c.Config) if err != nil { return err } diff --git a/pkg/kubenest/controlplane/apiserver.go b/pkg/kubenest/controlplane/apiserver.go new file mode 100644 index 000000000..141480cde --- /dev/null +++ b/pkg/kubenest/controlplane/apiserver.go @@ -0,0 +1,63 @@ +package controlplane + +import ( + "errors" + "fmt" + + appsv1 "k8s.io/api/apps/v1" + kuberuntime "k8s.io/apimachinery/pkg/runtime" + clientset "k8s.io/client-go/kubernetes" + clientsetscheme "k8s.io/client-go/kubernetes/scheme" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/apiserver" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" +) + +var errAllocated = errors.New("provided port is already allocated") + +func EnsureVirtualClusterAPIServer(client clientset.Interface, name, namespace string) error { + if err := installAPIServer(client, name, namespace); err != nil { + return fmt.Errorf("failed to install virtual cluster apiserver, err: %w", err) + } + return nil +} + +func installAPIServer(client clientset.Interface, name, namespace string) error { + imageRepository, imageVersion := util.GetImageMessage() + err, clusterIps := util.GetEtcdServiceClusterIp(namespace, client) + if err != nil { + return nil + } + + apiserverDeploymentBytes, err := util.ParseTemplate(apiserver.ApiserverDeployment, struct { + DeploymentName, Namespace, ImageRepository, EtcdClientService, Version string + ServiceSubnet, VirtualClusterCertsSecret, EtcdCertsSecret string + Replicas int32 + EtcdListenClientPort int32 + }{ + DeploymentName: fmt.Sprintf("%s-%s", name, "apiserver"), + Namespace: namespace, + ImageRepository: imageRepository, + Version: imageVersion, + EtcdClientService: clusterIps[1], + ServiceSubnet: constants.ApiServerServiceSubnet, + VirtualClusterCertsSecret: fmt.Sprintf("%s-%s", name, "cert"), + EtcdCertsSecret: fmt.Sprintf("%s-%s", name, "etcd-cert"), + Replicas: constants.ApiServerReplicas, + EtcdListenClientPort: constants.ApiServerEtcdListenClientPort, + }) + if err != nil { + return fmt.Errorf("error when parsing virtual cluster apiserver deployment template: %w", err) + } + + apiserverDeployment := &appsv1.Deployment{} + if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), apiserverDeploymentBytes, apiserverDeployment); err != nil { + return fmt.Errorf("error when decoding virtual cluster apiserver deployment: %w", err) + } + + if err := util.CreateOrUpdateDeployment(client, apiserverDeployment); err != nil { + return fmt.Errorf("error when creating deployment for %s, err: %w", apiserverDeployment.Name, err) + } + return nil +} diff --git a/pkg/kubenest/controlplane/component.go b/pkg/kubenest/controlplane/component.go new file mode 100644 index 000000000..0175c9ead --- /dev/null +++ b/pkg/kubenest/controlplane/component.go @@ -0,0 +1,191 @@ +package controlplane + +import ( + "context" + "fmt" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kuberuntime "k8s.io/apimachinery/pkg/runtime" + clientset "k8s.io/client-go/kubernetes" + clientsetscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + controller "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/kube-controller" + scheduler "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/scheduler" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" +) + +func EnsureControlPlaneComponent(component, name, namespace string, client clientset.Interface) error { + configMaps, err := getComponentConfigMapManifests(name, namespace) + if err != nil { + return err + } + configMap, ok := configMaps[constants.VirtualClusterSchedulerComponentConfigMap] + if !ok { + klog.Infof("Skip installing component configMap %s(%s/%s)", component, namespace, name) + return nil + } + + if err := createOrUpdateConfigMap(client, configMap); err != nil { + return fmt.Errorf("failed to create configMap resource for component %s, err: %w", component, err) + } + + deployments, err := getComponentManifests(name, namespace) + if err != nil { + return err + } + + deployment, ok := deployments[component] + if !ok { + klog.Infof("Skip installing component %s(%s/%s)", component, namespace, name) + return nil + } + + if err := createOrUpdateDeployment(client, deployment); err != nil { + return fmt.Errorf("failed to create deployment resource for component %s, err: %w", component, err) + } + + return nil +} + +func getComponentManifests(name, namespace string) (map[string]*appsv1.Deployment, error) { + kubeControllerManager, err := getKubeControllerManagerManifest(name, namespace) + if err != nil { + return nil, err + } + virtualClusterScheduler, err := getVirtualClusterSchedulerManifest(name, namespace) + if err != nil { + return nil, err + } + + manifest := map[string]*appsv1.Deployment{ + constants.KubeControllerManagerComponent: kubeControllerManager, + constants.VirtualClusterSchedulerComponent: virtualClusterScheduler, + } + + return manifest, nil +} + +func getComponentConfigMapManifests(name, namespace string) (map[string]*v1.ConfigMap, error) { + virtualClusterSchedulerConfigMap, err := getVirtualClusterSchedulerConfigMapManifest(name, namespace) + if err != nil { + return nil, err + } + + manifest := map[string]*v1.ConfigMap{ + constants.VirtualClusterSchedulerComponentConfigMap: virtualClusterSchedulerConfigMap, + } + + return manifest, nil +} + +func getKubeControllerManagerManifest(name, namespace string) (*appsv1.Deployment, error) { + imageRepository, imageVersion := util.GetImageMessage() + kubeControllerManagerBytes, err := util.ParseTemplate(controller.KubeControllerManagerDeployment, struct { + DeploymentName, Namespace, ImageRepository, Version string + VirtualClusterCertsSecret, KubeconfigSecret string + Replicas int32 + }{ + DeploymentName: fmt.Sprintf("%s-%s", name, "kube-controller-manager"), + Namespace: namespace, + ImageRepository: imageRepository, + Version: imageVersion, + VirtualClusterCertsSecret: fmt.Sprintf("%s-%s", name, "cert"), + KubeconfigSecret: fmt.Sprintf("%s-%s", name, "admin-config"), + Replicas: constants.KubeControllerReplicas, + }) + if err != nil { + return nil, fmt.Errorf("error when parsing kube-controller-manager deployment template: %w", err) + } + + kcm := &appsv1.Deployment{} + if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), kubeControllerManagerBytes, kcm); err != nil { + return nil, fmt.Errorf("err when decoding kube-controller-manager deployment: %w", err) + } + + return kcm, nil +} + +func getVirtualClusterSchedulerConfigMapManifest(name, namespace string) (*v1.ConfigMap, error) { + virtualClusterSchedulerConfigMapBytes, err := util.ParseTemplate(scheduler.VirtualClusterSchedulerConfigMap, struct { + DeploymentName, Namespace string + }{ + DeploymentName: fmt.Sprintf("%s-%s", name, "virtualcluster-scheduler"), + Namespace: namespace, + }) + if err != nil { + return nil, fmt.Errorf("error when parsing virtualCluster-scheduler configMap template: %w", err) + } + + scheduler := &v1.ConfigMap{} + if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), virtualClusterSchedulerConfigMapBytes, scheduler); err != nil { + return nil, fmt.Errorf("err when decoding virtualCluster-scheduler configMap: %w", err) + } + + return scheduler, nil +} + +func getVirtualClusterSchedulerManifest(name, namespace string) (*appsv1.Deployment, error) { + imageRepository, imageVersion := util.GetImageMessage() + virtualClusterSchedulerBytes, err := util.ParseTemplate(scheduler.VirtualClusterSchedulerDeployment, struct { + Replicas int32 + DeploymentName, Namespace, SystemNamespace, ImageRepository, Version string + Image, KubeconfigSecret string + }{ + DeploymentName: fmt.Sprintf("%s-%s", name, "virtualcluster-scheduler"), + Namespace: namespace, + SystemNamespace: constants.SystemNs, + ImageRepository: imageRepository, + Version: imageVersion, + KubeconfigSecret: fmt.Sprintf("%s-%s", name, "admin-config"), + Replicas: constants.VirtualClusterSchedulerReplicas, + }) + if err != nil { + return nil, fmt.Errorf("error when parsing virtualCluster-scheduler deployment template: %w", err) + } + + scheduler := &appsv1.Deployment{} + if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), virtualClusterSchedulerBytes, scheduler); err != nil { + return nil, fmt.Errorf("err when decoding virtualCluster-scheduler deployment: %w", err) + } + + return scheduler, nil +} + +func createOrUpdateDeployment(client clientset.Interface, deployment *appsv1.Deployment) error { + _, err := client.AppsV1().Deployments(deployment.GetNamespace()).Create(context.TODO(), deployment, metav1.CreateOptions{}) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return err + } + + _, err := client.AppsV1().Deployments(deployment.GetNamespace()).Update(context.TODO(), deployment, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + + klog.V(5).InfoS("Successfully created or updated deployment", "deployment", deployment.GetName()) + return nil +} + +func createOrUpdateConfigMap(client clientset.Interface, configMap *v1.ConfigMap) error { + _, err := client.CoreV1().ConfigMaps(configMap.GetNamespace()).Create(context.TODO(), configMap, metav1.CreateOptions{}) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return err + } + + _, err := client.CoreV1().ConfigMaps(configMap.GetNamespace()).Update(context.TODO(), configMap, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + + klog.V(5).InfoS("Successfully created or updated configMap", "configMap", configMap.GetName()) + return nil +} diff --git a/pkg/kubenest/controlplane/etcd.go b/pkg/kubenest/controlplane/etcd.go new file mode 100644 index 000000000..fedd13b2a --- /dev/null +++ b/pkg/kubenest/controlplane/etcd.go @@ -0,0 +1,102 @@ +package controlplane + +import ( + "context" + "fmt" + "strings" + + appsv1 "k8s.io/api/apps/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kuberuntime "k8s.io/apimachinery/pkg/runtime" + clientset "k8s.io/client-go/kubernetes" + clientsetscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/component-base/cli/flag" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/etcd" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" +) + +func EnsureVirtualClusterEtcd(client clientset.Interface, name, namespace string) error { + if err := installEtcd(client, name, namespace); err != nil { + return err + } + return nil +} + +func installEtcd(client clientset.Interface, name, namespace string) error { + imageRepository, imageVersion := util.GetImageMessage() + initialClusters := make([]string, constants.EtcdReplicas) + for index := range initialClusters { + memberName := fmt.Sprintf("%s-%d", fmt.Sprintf("%s-%s", name, "etcd"), index) + // build etcd member cluster peer url + memberPeerURL := fmt.Sprintf("http://%s.%s.%s.svc.cluster.local:%v", + memberName, + fmt.Sprintf("%s-%s", name, "etcd"), + namespace, + constants.EtcdListenPeerPort, + ) + + initialClusters[index] = fmt.Sprintf("%s=%s", memberName, memberPeerURL) + } + + etcdStatefulSetBytes, err := util.ParseTemplate(etcd.EtcdStatefulSet, struct { + StatefulSetName, Namespace, ImageRepository, Image, EtcdClientService, Version string + CertsSecretName, EtcdPeerServiceName string + InitialCluster, EtcdDataVolumeName, EtcdCipherSuites string + Replicas, EtcdListenClientPort, EtcdListenPeerPort int32 + }{ + StatefulSetName: fmt.Sprintf("%s-%s", name, "etcd"), + Namespace: namespace, + ImageRepository: imageRepository, + Version: imageVersion, + EtcdClientService: fmt.Sprintf("%s-%s", name, "etcd-client"), + CertsSecretName: fmt.Sprintf("%s-%s", name, "etcd-cert"), + EtcdPeerServiceName: fmt.Sprintf("%s-%s", name, "etcd"), + EtcdDataVolumeName: constants.EtcdDataVolumeName, + InitialCluster: strings.Join(initialClusters, ","), + EtcdCipherSuites: strings.Join(flag.PreferredTLSCipherNames(), ","), + Replicas: constants.EtcdReplicas, + EtcdListenClientPort: constants.EtcdListenClientPort, + EtcdListenPeerPort: constants.EtcdListenPeerPort, + }) + if err != nil { + return fmt.Errorf("error when parsing Etcd statefuelset template: %w", err) + } + + etcdStatefulSet := &appsv1.StatefulSet{} + if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdStatefulSetBytes, etcdStatefulSet); err != nil { + return fmt.Errorf("error when decoding Etcd StatefulSet: %w", err) + } + + if err := createOrUpdateStatefulSet(client, etcdStatefulSet); err != nil { + return fmt.Errorf("error when creating Etcd statefulset, err: %w", err) + } + + return nil +} + +func createOrUpdateStatefulSet(client clientset.Interface, statefulSet *appsv1.StatefulSet) error { + _, err := client.AppsV1().StatefulSets(statefulSet.GetNamespace()).Create(context.TODO(), statefulSet, metav1.CreateOptions{}) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return err + } + + older, err := client.AppsV1().StatefulSets(statefulSet.GetNamespace()).Get(context.TODO(), statefulSet.GetName(), metav1.GetOptions{}) + if err != nil { + return err + } + + statefulSet.ResourceVersion = older.ResourceVersion + _, err = client.AppsV1().StatefulSets(statefulSet.GetNamespace()).Update(context.TODO(), statefulSet, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + + klog.V(5).InfoS("Successfully created or updated statefulset", "statefulset", statefulSet.GetName) + return nil +} diff --git a/pkg/kubenest/controlplane/rbac.go b/pkg/kubenest/controlplane/rbac.go new file mode 100644 index 000000000..2bf8d7729 --- /dev/null +++ b/pkg/kubenest/controlplane/rbac.go @@ -0,0 +1,144 @@ +package controlplane + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kuberuntime "k8s.io/apimachinery/pkg/runtime" + clientset "k8s.io/client-go/kubernetes" + clientsetscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/rbac" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" +) + +func EnsureVirtualSchedulerRBAC(client clientset.Interface, namespace string) error { + if err := grantVirtualClusterResourceClusterSA(client, namespace); err != nil { + return err + } + if err := grantVirtualClusterResourceClusterRoleBinding(client, namespace); err != nil { + return err + } + if err := grantVirtualClusterResourceClusterRole(client); err != nil { + return err + } + return nil +} + +func grantVirtualClusterResourceClusterSA(client clientset.Interface, namespace string) error { + virtualClusterResourceClusterSABytes, err := util.ParseTemplate(rbac.VirtualSchedulerSA, struct { + Namespace string + }{ + Namespace: namespace, + }) + if err != nil { + return fmt.Errorf("error when parsing virtualCluster-scheduler sa template: %w", err) + } + serviceAccount := &v1.ServiceAccount{} + if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), []byte(virtualClusterResourceClusterSABytes), serviceAccount); err != nil { + return fmt.Errorf("err when decoding Karmada view Clusterrole: %w", err) + } + return createOrUpdateClusterSA(client, serviceAccount, namespace) +} + +func grantVirtualClusterResourceClusterRoleBinding(client clientset.Interface, namespace string) error { + virtualClusterResourceClusterRoleBindingBytes, err := util.ParseTemplate(rbac.VirtualSchedulerRoleBinding, struct { + Namespace string + }{ + Namespace: namespace, + }) + if err != nil { + return fmt.Errorf("error when parsing virtualCluster-scheduler role binding template: %w", err) + } + viewClusterRoleBinding := &rbacv1.ClusterRoleBinding{} + + if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), []byte(virtualClusterResourceClusterRoleBindingBytes), viewClusterRoleBinding); err != nil { + return fmt.Errorf("err when decoding virtualCluster scheduler Clusterrole Binding: %w", err) + } + return createOrUpdateClusterRoleBinding(client, viewClusterRoleBinding) +} + +func grantVirtualClusterResourceClusterRole(client clientset.Interface) error { + viewClusterrole := &rbacv1.ClusterRole{} + if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), []byte(rbac.VirtualSchedulerRole), viewClusterrole); err != nil { + return fmt.Errorf("err when decoding virtualCluster scheduler Clusterrole: %w", err) + } + return createOrUpdateClusterRole(client, viewClusterrole) +} + +func createOrUpdateClusterSA(client clientset.Interface, serviceAccount *v1.ServiceAccount, namespace string) error { + _, err := client.CoreV1().ServiceAccounts(namespace).Create(context.TODO(), serviceAccount, metav1.CreateOptions{}) + + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return err + } + + older, err := client.CoreV1().ServiceAccounts(namespace).Get(context.TODO(), serviceAccount.GetName(), metav1.GetOptions{}) + if err != nil { + return err + } + + serviceAccount.ResourceVersion = older.ResourceVersion + _, err = client.CoreV1().ServiceAccounts(namespace).Update(context.TODO(), serviceAccount, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + + klog.V(4).InfoS("Successfully created or updated serviceAccount", "serviceAccount", serviceAccount.GetName) + return nil +} + +func createOrUpdateClusterRole(client clientset.Interface, clusterrole *rbacv1.ClusterRole) error { + _, err := client.RbacV1().ClusterRoles().Create(context.TODO(), clusterrole, metav1.CreateOptions{}) + + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return err + } + + older, err := client.RbacV1().ClusterRoles().Get(context.TODO(), clusterrole.GetName(), metav1.GetOptions{}) + if err != nil { + return err + } + + clusterrole.ResourceVersion = older.ResourceVersion + _, err = client.RbacV1().ClusterRoles().Update(context.TODO(), clusterrole, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + + klog.V(4).InfoS("Successfully created or updated clusterrole", "clusterrole", clusterrole.GetName) + return nil +} + +func createOrUpdateClusterRoleBinding(client clientset.Interface, clusterroleBinding *rbacv1.ClusterRoleBinding) error { + _, err := client.RbacV1().ClusterRoleBindings().Create(context.TODO(), clusterroleBinding, metav1.CreateOptions{}) + + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return err + } + + older, err := client.RbacV1().ClusterRoles().Get(context.TODO(), clusterroleBinding.GetName(), metav1.GetOptions{}) + if err != nil { + return err + } + + clusterroleBinding.ResourceVersion = older.ResourceVersion + _, err = client.RbacV1().ClusterRoleBindings().Update(context.TODO(), clusterroleBinding, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + + klog.V(4).InfoS("Successfully created or updated clusterrolebinding", "clusterrolebinding", clusterroleBinding.GetName) + return nil +} diff --git a/pkg/kubenest/controlplane/service.go b/pkg/kubenest/controlplane/service.go new file mode 100644 index 000000000..3ac2ffb7d --- /dev/null +++ b/pkg/kubenest/controlplane/service.go @@ -0,0 +1,119 @@ +package controlplane + +import ( + "context" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kuberuntime "k8s.io/apimachinery/pkg/runtime" + clientset "k8s.io/client-go/kubernetes" + clientsetscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/apiserver" + "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/etcd" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" +) + +func EnsureVirtualClusterService(client clientset.Interface, name, namespace string) error { + if err := createServerService(client, name, namespace); err != nil { + return fmt.Errorf("failed to create virtual cluster apiserver-service, err: %w", err) + } + return nil +} + +func createServerService(client clientset.Interface, name, namespace string) error { + apiserverServiceBytes, err := util.ParseTemplate(apiserver.ApiserverService, struct { + ServiceName, Namespace, ServiceType string + }{ + ServiceName: fmt.Sprintf("%s-%s", name, "apiserver"), + Namespace: namespace, + ServiceType: constants.ApiServerServiceType, + }) + if err != nil { + return fmt.Errorf("error when parsing virtualClusterApiserver serive template: %w", err) + } + + apiserverService := &corev1.Service{} + if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), apiserverServiceBytes, apiserverService); err != nil { + return fmt.Errorf("error when decoding virtual cluster apiserver service: %w", err) + } + if err := createOrUpdateService(client, apiserverService); err != nil { + return fmt.Errorf("err when creating virtual cluster apiserver service for %s, err: %w", apiserverService.Name, err) + } + + etcdServicePeerBytes, err := util.ParseTemplate(etcd.EtcdPeerService, struct { + ServiceName, Namespace string + EtcdListenClientPort, EtcdListenPeerPort int32 + }{ + ServiceName: fmt.Sprintf("%s-%s", name, "etcd"), + Namespace: namespace, + EtcdListenClientPort: constants.EtcdListenClientPort, + EtcdListenPeerPort: constants.EtcdListenPeerPort, + }) + if err != nil { + return fmt.Errorf("error when parsing Etcd client serive template: %w", err) + } + + etcdPeerService := &corev1.Service{} + if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdServicePeerBytes, etcdPeerService); err != nil { + return fmt.Errorf("error when decoding Etcd client service: %w", err) + } + + if err := createOrUpdateService(client, etcdPeerService); err != nil { + return fmt.Errorf("error when creating etcd client service, err: %w", err) + } + + etcdClientServiceBytes, err := util.ParseTemplate(etcd.EtcdClientService, struct { + ServiceName, Namespace string + EtcdListenClientPort int32 + }{ + ServiceName: fmt.Sprintf("%s-%s", name, "etcd-client"), + Namespace: namespace, + EtcdListenClientPort: constants.EtcdListenClientPort, + }) + if err != nil { + return fmt.Errorf("error when parsing Etcd client serive template: %w", err) + } + + etcdClientService := &corev1.Service{} + if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdClientServiceBytes, etcdClientService); err != nil { + return fmt.Errorf("err when decoding Etcd client service: %w", err) + } + + if err := createOrUpdateService(client, etcdClientService); err != nil { + return fmt.Errorf("err when creating etcd client service, err: %w", err) + } + + return nil +} + +func createOrUpdateService(client clientset.Interface, service *corev1.Service) error { + _, err := client.CoreV1().Services(service.GetNamespace()).Create(context.TODO(), service, metav1.CreateOptions{}) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + if apierrors.IsInvalid(err) && strings.Contains(err.Error(), errAllocated.Error()) { + klog.V(2).ErrorS(err, "failed to create or update service", "service", klog.KObj(service)) + return nil + } + return fmt.Errorf("unable to create Service: %v", err) + } + + older, err := client.CoreV1().Services(service.GetNamespace()).Get(context.TODO(), service.GetName(), metav1.GetOptions{}) + if err != nil { + return err + } + + service.ResourceVersion = older.ResourceVersion + if _, err := client.CoreV1().Services(service.GetNamespace()).Update(context.TODO(), service, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("unable to update Service: %v", err) + } + } + + klog.V(5).InfoS("Successfully created or updated service", "service", service.GetName()) + return nil +} diff --git a/pkg/kubenest/init.go b/pkg/kubenest/init.go new file mode 100644 index 000000000..4fd1b98e1 --- /dev/null +++ b/pkg/kubenest/init.go @@ -0,0 +1,194 @@ +package kubenest + +import ( + "errors" + "fmt" + + utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilversion "k8s.io/apimachinery/pkg/util/version" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" + "github.com/kosmos.io/kosmos/pkg/kubenest/tasks" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" + "github.com/kosmos.io/kosmos/pkg/kubenest/util/cert" + "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" + "github.com/kosmos.io/kosmos/pkg/utils" +) + +var _ tasks.InitData = &initData{} + +type initData struct { + cert.CertStore + name string + namespace string + virtualClusterVersion *utilversion.Version + controlplaneAddr string + clusterIps []string + remoteClient clientset.Interface + kosmosClient versioned.Interface + virtualClusterDataDir string + privateRegistry string +} + +type InitOptions struct { + Name string + Namespace string + Kubeconfig *rest.Config + virtualClusterVersion string + virtualClusterDataDir string + virtualCluster *v1alpha1.VirtualCluster +} + +func NewInitPhase(opts *InitOptions) *workflow.Phase { + initPhase := workflow.NewPhase() + + initPhase.AppendTask(tasks.NewVirtualClusterServiceTask()) + initPhase.AppendTask(tasks.NewCertTask()) + initPhase.AppendTask(tasks.NewUploadCertsTask()) + initPhase.AppendTask(tasks.NewEtcdTask()) + initPhase.AppendTask(tasks.NewVirtualClusterApiserverTask()) + initPhase.AppendTask(tasks.NewUploadKubeconfigTask()) + initPhase.AppendTask(tasks.NewCheckApiserverHealthTask()) + initPhase.AppendTask(tasks.NewComponentTask()) + initPhase.AppendTask(tasks.NewCheckControlPlaneTask()) + + initPhase.SetDataInitializer(func() (workflow.RunData, error) { + return newRunData(opts) + }) + return initPhase +} + +type InitOpt func(o *InitOptions) + +func NewPhaseInitOptions(opts ...InitOpt) *InitOptions { + options := defaultJobInitOptions() + + for _, c := range opts { + c(options) + } + return options +} + +func defaultJobInitOptions() *InitOptions { + virtualCluster := &v1alpha1.VirtualCluster{} + return &InitOptions{ + virtualClusterVersion: "0.0.0", + virtualClusterDataDir: "var/lib/virtualCluster", + virtualCluster: virtualCluster, + } +} + +func NewInitOptWithVirtualCluster(virtualCluster *v1alpha1.VirtualCluster) InitOpt { + return func(o *InitOptions) { + o.virtualCluster = virtualCluster + o.Name = virtualCluster.GetName() + o.Namespace = virtualCluster.GetNamespace() + } +} + +func NewInitOptWithKubeconfig(config *rest.Config) InitOpt { + return func(o *InitOptions) { + o.Kubeconfig = config + } +} + +func newRunData(opt *InitOptions) (*initData, error) { + if err := opt.Validate(); err != nil { + return nil, err + } + + localClusterClient, err := clientset.NewForConfig(opt.Kubeconfig) + if err != nil { + return nil, fmt.Errorf("error when creating local cluster client, err: %w", err) + } + var remoteClient clientset.Interface = localClusterClient + + kosmosClient, err := versioned.NewForConfig(opt.Kubeconfig) + if err != nil { + return nil, fmt.Errorf("error when creating kosmosClient client, err: %w", err) + } + + version, err := utilversion.ParseGeneric(opt.virtualClusterVersion) + if err != nil { + return nil, fmt.Errorf("unexpected virtual cluster invalid version %s", opt.virtualClusterVersion) + } + + var address string + address, err = util.GetAPIServiceIP(remoteClient) + if err != nil { + return nil, fmt.Errorf("failed to get a valid node IP for APIServer, err: %w", err) + } + var clusterIps []string + err, clusterIp := util.GetAPIServiceClusterIp(opt.Namespace, remoteClient) + clusterIps = append(clusterIps, clusterIp) + if err != nil { + return nil, fmt.Errorf("failed to get APIServer Service-ClusterIp, err: %w", err) + } + return &initData{ + name: opt.Name, + namespace: opt.Namespace, + virtualClusterVersion: version, + controlplaneAddr: address, + clusterIps: clusterIps, + remoteClient: remoteClient, + kosmosClient: kosmosClient, + virtualClusterDataDir: opt.virtualClusterDataDir, + privateRegistry: utils.DefaultImageRepository, + CertStore: cert.NewCertStore(), + }, nil +} + +// TODO Add more detailed verification content +func (opt *InitOptions) Validate() error { + var errs []error + + if len(opt.Name) == 0 || len(opt.Namespace) == 0 { + return errors.New("unexpected empty name or namespace") + } + + _, err := utilversion.ParseGeneric(opt.virtualClusterVersion) + if err != nil { + return fmt.Errorf("unexpected virtual cluster invalid version %s", opt.virtualClusterVersion) + } + + return utilerrors.NewAggregate(errs) +} + +func (i initData) GetName() string { + return i.name +} + +func (i initData) GetNamespace() string { + return i.namespace +} + +func (i initData) ControlplaneAddress() string { + return i.controlplaneAddr +} + +func (i initData) ServiceClusterIp() []string { + err, clusterIps := util.GetServiceClusterIp(i.namespace, i.remoteClient) + if err != nil { + return nil + } + return clusterIps +} + +func (i initData) RemoteClient() clientset.Interface { + return i.remoteClient +} + +func (i initData) KosmosClient() versioned.Interface { + return i.kosmosClient +} + +func (i initData) DataDir() string { + return i.virtualClusterDataDir +} + +func (i initData) VirtualClusterVersion() string { + return i.virtualClusterVersion.String() +} diff --git a/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go b/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go new file mode 100644 index 000000000..b54f1a752 --- /dev/null +++ b/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go @@ -0,0 +1,113 @@ +package apiserver + +const ( + ApiserverDeployment = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + virtualCluster-app: apiserver + app.kubernetes.io/managed-by: virtual-cluster-controller + name: {{ .DeploymentName }} + namespace: {{ .Namespace }} +spec: + replicas: {{ .Replicas }} + selector: + matchLabels: + virtualCluster-app: apiserver + template: + metadata: + labels: + virtualCluster-app: apiserver + spec: + automountServiceAccountToken: false + hostNetwork: true + containers: + - name: kube-apiserver + image: {{ .ImageRepository }}/kube-apiserver:{{ .Version }} + imagePullPolicy: IfNotPresent + command: + - kube-apiserver + - --allow-privileged=true + - --authorization-mode=Node,RBAC + - --client-ca-file=/etc/virtualcluster/pki/ca.crt + - --enable-admission-plugins=NodeRestriction + - --enable-bootstrap-token-auth=true + - --etcd-cafile=/etc/etcd/pki/etcd-ca.crt + - --etcd-certfile=/etc/etcd/pki/etcd-client.crt + - --etcd-keyfile=/etc/etcd/pki/etcd-client.key + #- --etcd-servers=https://{{ .EtcdClientService }}.{{ .Namespace }}.svc.cluster.local:{{ .EtcdListenClientPort }} + - --etcd-servers=https://{{ .EtcdClientService }}:{{ .EtcdListenClientPort }} + - --bind-address=0.0.0.0 + - --kubelet-client-certificate=/etc/virtualcluster/pki/virtualCluster.crt + - --kubelet-client-key=/etc/virtualcluster/pki/virtualCluster.key + - --kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname + - --secure-port=5443 + - --service-account-issuer=https://kubernetes.default.svc.cluster.local + - --service-account-key-file=/etc/virtualcluster/pki/virtualCluster.key + - --service-account-signing-key-file=/etc/virtualcluster/pki/virtualCluster.key + - --service-cluster-ip-range={{ .ServiceSubnet }} + - --proxy-client-cert-file=/etc/virtualcluster/pki/front-proxy-client.crt + - --proxy-client-key-file=/etc/virtualcluster/pki/front-proxy-client.key + - --requestheader-allowed-names=front-proxy-client + - --requestheader-client-ca-file=/etc/virtualcluster/pki/front-proxy-ca.crt + - --requestheader-extra-headers-prefix=X-Remote-Extra- + - --requestheader-group-headers=X-Remote-Group + - --requestheader-username-headers=X-Remote-User + - --tls-cert-file=/etc/virtualcluster/pki/apiserver.crt + - --tls-private-key-file=/etc/virtualcluster/pki/apiserver.key + - --tls-min-version=VersionTLS13 + - --max-requests-inflight=1500 + - --max-mutating-requests-inflight=500 + - --v=4 + livenessProbe: + failureThreshold: 8 + httpGet: + path: /livez + port: 5443 + scheme: HTTPS + initialDelaySeconds: 10 + periodSeconds: 10 + successThreshold: 1 + timeoutSeconds: 15 + readinessProbe: + failureThreshold: 3 + httpGet: + path: /readyz + port: 5443 + scheme: HTTPS + initialDelaySeconds: 10 + periodSeconds: 10 + successThreshold: 1 + timeoutSeconds: 15 + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: virtualCluster-app + operator: In + values: + - apiserver + topologyKey: kubernetes.io/hostname + ports: + - containerPort: 5443 + name: http + protocol: TCP + volumeMounts: + - mountPath: /etc/virtualcluster/pki + name: apiserver-cert + readOnly: true + - mountPath: /etc/etcd/pki + name: etcd-cert + readOnly: true + priorityClassName: system-node-critical + volumes: + - name: apiserver-cert + secret: + secretName: {{ .VirtualClusterCertsSecret }} + - name: etcd-cert + secret: + secretName: {{ .EtcdCertsSecret }} +` +) diff --git a/pkg/kubenest/manifest/controlplane/apiserver/mainfests_service.go b/pkg/kubenest/manifest/controlplane/apiserver/mainfests_service.go new file mode 100644 index 000000000..ae1d0e592 --- /dev/null +++ b/pkg/kubenest/manifest/controlplane/apiserver/mainfests_service.go @@ -0,0 +1,23 @@ +package apiserver + +const ( + ApiserverService = ` +apiVersion: v1 +kind: Service +metadata: + labels: + virtualCluster-app: apiserver + app.kubernetes.io/managed-by: virtual-cluster-controller + name: {{ .ServiceName }} + namespace: {{ .Namespace }} +spec: + ports: + - name: client + port: 443 + protocol: TCP + targetPort: 5443 + selector: + virtualCluster-app: apiserver + type: {{ .ServiceType }} +` +) diff --git a/pkg/kubenest/manifest/controlplane/etcd/mainfests_deployment.go b/pkg/kubenest/manifest/controlplane/etcd/mainfests_deployment.go new file mode 100644 index 000000000..17b9b3378 --- /dev/null +++ b/pkg/kubenest/manifest/controlplane/etcd/mainfests_deployment.go @@ -0,0 +1,97 @@ +package etcd + +const ( + // EtcdStatefulSet is etcd StatefulSet manifest + EtcdStatefulSet = ` +apiVersion: apps/v1 +kind: StatefulSet +metadata: + labels: + virtualCluster-app: etcd + app.kubernetes.io/managed-by: virtual-cluster-controller + namespace: {{ .Namespace }} + name: {{ .StatefulSetName }} +spec: + replicas: {{ .Replicas }} + serviceName: {{ .StatefulSetName }} + podManagementPolicy: Parallel + selector: + matchLabels: + virtualCluster-app: etcd + template: + metadata: + labels: + virtualCluster-app: etcd + tolerations: + - operator: Exists + spec: + automountServiceAccountToken: false + containers: + - name: etcd + image: {{ .ImageRepository }}/etcd:{{ .Version }} + imagePullPolicy: IfNotPresent + command: + - /usr/local/bin/etcd + - --name=$(VIRTUAL_ETCD_NAME) + - --listen-client-urls= https://0.0.0.0:{{ .EtcdListenClientPort }} + - --listen-peer-urls=http://0.0.0.0:{{ .EtcdListenPeerPort }} + - --advertise-client-urls=https://{{ .EtcdClientService }}.{{ .Namespace }}.svc.cluster.local:{{ .EtcdListenClientPort }} + - --initial-cluster={{ .InitialCluster }} + - --initial-cluster-state=new + - --client-cert-auth=true + - --trusted-ca-file=/etc/virtualcluster/pki/etcd/etcd-ca.crt + - --cert-file=/etc/virtualcluster/pki/etcd/etcd-server.crt + - --key-file=/etc/virtualcluster/pki/etcd/etcd-server.key + - --data-dir=/var/lib/etcd + - --snapshot-count=10000 + - --log-level=debug + - --cipher-suites={{ .EtcdCipherSuites }} + #- --peer-cert-file=/etc/virtualcluster/pki/etcd/etcd-server.crt + #- --peer-client-cert-auth=true + #- --peer-key-file=/etc/virtualcluster/pki/etcd/etcd-server.key + #- --peer-trusted-ca-file=/etc/virtualcluster/pki/etcd/etcd-ca.crt + env: + - name: VIRTUAL_ETCD_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + livenessProbe: + exec: + command: + - /bin/sh + - -ec + - etcdctl get /registry --prefix --keys-only --endpoints https://127.0.0.1:{{ .EtcdListenClientPort }} --cacert=/etc/virtualcluster/pki/etcd/etcd-ca.crt --cert=/etc/virtualcluster/pki/etcd/etcd-server.crt --key=/etc/virtualcluster/pki/etcd/etcd-server.key + failureThreshold: 3 + initialDelaySeconds: 600 + periodSeconds: 60 + successThreshold: 1 + timeoutSeconds: 10 + ports: + - containerPort: {{ .EtcdListenClientPort }} + name: client + protocol: TCP + - containerPort: {{ .EtcdListenPeerPort }} + name: server + protocol: TCP + volumeMounts: + - mountPath: /var/lib/etcd + name: {{ .EtcdDataVolumeName }} + - mountPath: /etc/virtualcluster/pki/etcd + name: etcd-cert + volumes: + - name: etcd-cert + secret: + secretName: {{ .CertsSecretName }} + volumeClaimTemplates: + - metadata: + name: etcd-data + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi + storageClassName: openebs-hostpath +` +) diff --git a/pkg/kubenest/manifest/controlplane/etcd/mainfests_service.go b/pkg/kubenest/manifest/controlplane/etcd/mainfests_service.go new file mode 100644 index 000000000..5b568a427 --- /dev/null +++ b/pkg/kubenest/manifest/controlplane/etcd/mainfests_service.go @@ -0,0 +1,50 @@ +package etcd + +const ( + // EtcdClientService is etcd client service manifest + EtcdClientService = ` +apiVersion: v1 +kind: Service +metadata: + labels: + virtualCluster-app: etcd + app.kubernetes.io/managed-by: virtual-cluster-controller + name: {{ .ServiceName }} + namespace: {{ .Namespace }} +spec: + ports: + - name: client + port: {{ .EtcdListenClientPort }} + protocol: TCP + targetPort: {{ .EtcdListenClientPort }} + selector: + virtualCluster-app: etcd + type: ClusterIP + ` + + // EtcdPeerService is etcd peer Service manifest + EtcdPeerService = ` + apiVersion: v1 + kind: Service + metadata: + labels: + virtualCluster-app: etcd + app.kubernetes.io/managed-by: virtual-cluster-controller + name: {{ .ServiceName }} + namespace: {{ .Namespace }} + spec: + clusterIP: None + ports: + - name: client + port: {{ .EtcdListenClientPort }} + protocol: TCP + targetPort: {{ .EtcdListenClientPort }} + - name: server + port: {{ .EtcdListenPeerPort }} + protocol: TCP + targetPort: {{ .EtcdListenPeerPort }} + selector: + virtualCluster-app: etcd + type: ClusterIP + ` +) diff --git a/pkg/kubenest/manifest/controlplane/kube-controller/manifests_deployment.go b/pkg/kubenest/manifest/controlplane/kube-controller/manifests_deployment.go new file mode 100644 index 000000000..e88d8eda9 --- /dev/null +++ b/pkg/kubenest/manifest/controlplane/kube-controller/manifests_deployment.go @@ -0,0 +1,84 @@ +package kube_controller + +const ( + // KubeControllerManagerDeployment is KubeControllerManage deployment manifest + KubeControllerManagerDeployment = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .DeploymentName }} + namespace: {{ .Namespace }} + labels: + virtualCluster-app: kube-controller-manager + app.kubernetes.io/managed-by: virtual-cluster-controller +spec: + replicas: {{ .Replicas }} + selector: + matchLabels: + virtualCluster-app: kube-controller-manager + template: + metadata: + labels: + virtualCluster-app: kube-controller-manager + spec: + automountServiceAccountToken: false + priorityClassName: system-node-critical + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: virtualCluster-app + operator: In + values: ["kube-controller-manager"] + topologyKey: kubernetes.io/hostname + containers: + - name: kube-controller-manager + image: {{ .ImageRepository }}/kube-controller-manager:{{ .Version }} + imagePullPolicy: IfNotPresent + command: + - kube-controller-manager + - --allocate-node-cidrs=true + - --kubeconfig=/etc/virtualcluster/kubeconfig + - --authentication-kubeconfig=/etc/virtualcluster/kubeconfig + - --authorization-kubeconfig=/etc/virtualcluster/kubeconfig + - --bind-address=0.0.0.0 + - --client-ca-file=/etc/virtualcluster/pki/ca.crt + - --cluster-cidr=10.244.0.0/16 + - --cluster-name=virtualcluster + - --cluster-signing-cert-file=/etc/virtualcluster/pki/ca.crt + - --cluster-signing-key-file=/etc/virtualcluster/pki/ca.key + - --controllers=*,namespace,garbagecollector,serviceaccount-token,ttl-after-finished,bootstrapsigner,csrapproving,csrcleaner,csrsigning,clusterrole-aggregation + - --leader-elect=true + - --node-cidr-mask-size=24 + - --root-ca-file=/etc/virtualcluster/pki/ca.crt + - --service-account-private-key-file=/etc/virtualcluster/pki/virtualCluster.key + - --service-cluster-ip-range=10.96.0.0/12 + - --use-service-account-credentials=true + - --v=4 + livenessProbe: + failureThreshold: 8 + httpGet: + path: /healthz + port: 10257 + scheme: HTTPS + initialDelaySeconds: 10 + periodSeconds: 10 + successThreshold: 1 + timeoutSeconds: 15 + volumeMounts: + - name: virtualcluster-certs + mountPath: /etc/virtualcluster/pki + readOnly: true + - name: kubeconfig + mountPath: /etc/virtualcluster/kubeconfig + subPath: kubeconfig + volumes: + - name: virtualcluster-certs + secret: + secretName: {{ .VirtualClusterCertsSecret }} + - name: kubeconfig + secret: + secretName: {{ .KubeconfigSecret }} +` +) diff --git a/pkg/kubenest/manifest/controlplane/rbac/manifests_rbac.go b/pkg/kubenest/manifest/controlplane/rbac/manifests_rbac.go new file mode 100644 index 000000000..050e55a6e --- /dev/null +++ b/pkg/kubenest/manifest/controlplane/rbac/manifests_rbac.go @@ -0,0 +1,173 @@ +package rbac + +const ( + VirtualSchedulerSA = ` +apiVersion: v1 +kind: ServiceAccount +metadata: + name: virtualcluster-scheduler + namespace: {{ .Namespace }} +` + VirtualSchedulerRoleBinding = ` +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: virtual-scheduler +subjects: + - kind: ServiceAccount + name: virtualcluster-scheduler + namespace: {{ .Namespace }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: virtual-scheduler +` + VirtualSchedulerRole = ` +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: virtual-scheduler +rules: + - verbs: + - create + - patch + - update + apiGroups: + - '' + - events.k8s.io + resources: + - events + - verbs: + - create + apiGroups: + - coordination.k8s.io + resources: + - leases + - verbs: + - get + - update + apiGroups: + - coordination.k8s.io + resources: + - leases + resourceNames: + - virtualcluster-scheduler + - verbs: + - create + apiGroups: + - '' + resources: + - endpoints + - verbs: + - get + - update + apiGroups: + - '' + resources: + - endpoints + - verbs: + - get + - list + - watch + apiGroups: + - '' + resources: + - nodes + - verbs: + - delete + - get + - list + - watch + apiGroups: + - '' + resources: + - pods + - verbs: + - create + apiGroups: + - '' + resources: + - bindings + - pods/binding + - verbs: + - patch + - update + apiGroups: + - '' + resources: + - pods/status + - verbs: + - get + - list + - watch + apiGroups: + - '' + resources: + - replicationcontrollers + - services + - verbs: + - get + - list + - watch + apiGroups: + - apps + - extensions + resources: + - replicasets + - verbs: + - get + - list + - watch + apiGroups: + - apps + resources: + - statefulsets + - verbs: + - get + - list + - watch + apiGroups: + - policy + resources: + - poddisruptionbudgets + - verbs: + - get + - list + - watch + - update + apiGroups: + - '' + resources: + - persistentvolumeclaims + - persistentvolumes + - verbs: + - create + apiGroups: + - authentication.k8s.io + resources: + - tokenreviews + - verbs: + - create + apiGroups: + - authorization.k8s.io + resources: + - subjectaccessreviews + - verbs: + - get + - list + - watch + apiGroups: + - storage.k8s.io + resources: + - '*' + - verbs: + - get + - list + - watch + apiGroups: + - '' + resources: + - configmaps + - namespaces +` +) diff --git a/pkg/kubenest/manifest/controlplane/scheduler/manifest_configmap.go b/pkg/kubenest/manifest/controlplane/scheduler/manifest_configmap.go new file mode 100644 index 000000000..9225def24 --- /dev/null +++ b/pkg/kubenest/manifest/controlplane/scheduler/manifest_configmap.go @@ -0,0 +1,53 @@ +package kube_controller + +const ( + VirtualClusterSchedulerConfigMap = ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: scheduler-config + namespace: {{ .Namespace }} +data: + scheduler-config.yaml: | + apiVersion: kubescheduler.config.k8s.io/v1 + kind: KubeSchedulerConfiguration + leaderElection: + leaderElect: true + resourceName: {{ .DeploymentName }} + resourceNamespace: kube-system + clientConnection: + kubeconfig: /etc/virtualcluster/kubeconfig + profiles: + - schedulerName: default-scheduler + plugins: + preFilter: + disabled: + - name: "VolumeBinding" + enabled: + - name: "LeafNodeVolumeBinding" + filter: + disabled: + - name: "VolumeBinding" + - name: "TaintToleration" + enabled: + - name: "LeafNodeTaintToleration" + - name: "LeafNodeVolumeBinding" + score: + disabled: + - name: "VolumeBinding" + reserve: + disabled: + - name: "VolumeBinding" + enabled: + - name: "LeafNodeVolumeBinding" + preBind: + disabled: + - name: "VolumeBinding" + enabled: + - name: "LeafNodeVolumeBinding" + pluginConfig: + - name: LeafNodeVolumeBinding + args: + bindTimeoutSeconds: 5 +` +) diff --git a/pkg/kubenest/manifest/controlplane/scheduler/manifest_deployment.go b/pkg/kubenest/manifest/controlplane/scheduler/manifest_deployment.go new file mode 100644 index 000000000..0b9246f89 --- /dev/null +++ b/pkg/kubenest/manifest/controlplane/scheduler/manifest_deployment.go @@ -0,0 +1,62 @@ +package kube_controller + +const ( + VirtualClusterSchedulerDeployment = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .DeploymentName }} + namespace: {{ .Namespace }} + labels: + virtualCluster-app: scheduler + app.kubernetes.io/managed-by: virtual-cluster-controller +spec: + replicas: {{ .Replicas }} + selector: + matchLabels: + virtualCluster-app: scheduler + template: + metadata: + labels: + virtualCluster-app: scheduler + spec: + automountServiceAccountToken: false + tolerations: + - key: node-role.kubernetes.io/master + operator: Exists + containers: + - name: scheduler + image: {{ .ImageRepository }}/scheduler:{{ .Version }} + imagePullPolicy: IfNotPresent + command: + - scheduler + - --config=/etc/kubernetes/kube-scheduler/scheduler-config.yaml + - --authentication-kubeconfig=/etc/virtualcluster/kubeconfig + - --authorization-kubeconfig=/etc/virtualcluster/kubeconfig + - --v=4 + livenessProbe: + httpGet: + path: /healthz + port: 10259 + scheme: HTTPS + failureThreshold: 3 + initialDelaySeconds: 15 + periodSeconds: 15 + timeoutSeconds: 5 + volumeMounts: + - name: kubeconfig + subPath: kubeconfig + mountPath: /etc/virtualcluster/kubeconfig + - name: scheduler-config + readOnly: true + mountPath: /etc/kubernetes/kube-scheduler + volumes: + - name: kubeconfig + secret: + secretName: {{ .KubeconfigSecret }} + - name: scheduler-config + configMap: + defaultMode: 420 + name: scheduler-config +` +) diff --git a/pkg/treeoperator/manifest/manifest_deployments.go b/pkg/kubenest/manifest/kosmos/manifest_deployment.go similarity index 94% rename from pkg/treeoperator/manifest/manifest_deployments.go rename to pkg/kubenest/manifest/kosmos/manifest_deployment.go index c69163e01..b2a83eeec 100644 --- a/pkg/treeoperator/manifest/manifest_deployments.go +++ b/pkg/kubenest/manifest/kosmos/manifest_deployment.go @@ -1,7 +1,6 @@ -package manifest +package kosmos const ( - DefaultKubeconfigPath = "/etc/cluster-tree/cert" ClusterTreeClusterManagerDeployment = `--- apiVersion: apps/v1 kind: Deployment diff --git a/pkg/treeoperator/manifest/manifest_secrets.go b/pkg/kubenest/manifest/kosmos/manifest_secret.go similarity index 95% rename from pkg/treeoperator/manifest/manifest_secrets.go rename to pkg/kubenest/manifest/kosmos/manifest_secret.go index ae7424a75..4be4420f7 100644 --- a/pkg/treeoperator/manifest/manifest_secrets.go +++ b/pkg/kubenest/manifest/kosmos/manifest_secret.go @@ -1,4 +1,4 @@ -package manifest +package kosmos const ( ClusterTreeClusterManagerSecret = `--- diff --git a/pkg/kubenest/tasks/apiserver.go b/pkg/kubenest/tasks/apiserver.go new file mode 100644 index 000000000..d98fe2340 --- /dev/null +++ b/pkg/kubenest/tasks/apiserver.go @@ -0,0 +1,82 @@ +package tasks + +import ( + "errors" + "fmt" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/controlplane" + apiclient "github.com/kosmos.io/kosmos/pkg/kubenest/util/api-client" + "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" +) + +var ( + virtualClusterApiserverLabels = labels.Set{constants.Label: constants.ApiServer} +) + +func NewVirtualClusterApiserverTask() workflow.Task { + return workflow.Task{ + Name: "apiserver", + Run: runApiserver, + RunSubTasks: true, + Tasks: []workflow.Task{ + { + Name: "deploy-apiserver", + Run: runVirtualClusterAPIServer, + }, + { + Name: "check-apiserver", + Run: runCheckVirtualClusterAPIServer, + }, + }, + } +} + +func runApiserver(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("apiserver task invoked with an invalid data struct") + } + + klog.V(4).InfoS("[apiserver] Running apiserver task", "virtual cluster", klog.KObj(data)) + return nil +} + +func runVirtualClusterAPIServer(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("Virtual cluster apiserver task invoked with an invalid data struct") + } + + err := controlplane.EnsureVirtualClusterAPIServer( + data.RemoteClient(), + data.GetName(), + data.GetNamespace(), + ) + if err != nil { + return fmt.Errorf("failed to install virtual cluster apiserver component, err: %w", err) + } + + klog.V(2).InfoS("[VirtualClusterApiserver] Successfully installed virtual cluster apiserver component", "virtual cluster", klog.KObj(data)) + return nil +} + +func runCheckVirtualClusterAPIServer(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("check-VirtualClusterAPIServer task invoked with an invalid data struct") + } + + checker := apiclient.NewVirtualClusterChecker(data.RemoteClient(), constants.ComponentBeReadyTimeout) + + err := checker.WaitForSomePods(virtualClusterApiserverLabels.String(), data.GetNamespace(), 1) + if err != nil { + return fmt.Errorf("checking for virtual cluster apiserver to ready timeout, err: %w", err) + } + + klog.V(2).InfoS("[check-VirtualClusterAPIServer] the virtual cluster apiserver is ready", "virtual cluster", klog.KObj(data)) + return nil +} diff --git a/pkg/kubenest/tasks/cert.go b/pkg/kubenest/tasks/cert.go new file mode 100644 index 000000000..9f24b316a --- /dev/null +++ b/pkg/kubenest/tasks/cert.go @@ -0,0 +1,144 @@ +package tasks + +import ( + "context" + "errors" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/util/cert" + "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" +) + +func NewCertTask() workflow.Task { + return workflow.Task{ + Name: "Certs", + Run: runCerts, + Skip: skipCerts, + RunSubTasks: true, + Tasks: newCertSubTasks(), + } +} + +func runCerts(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("certs task invoked with an invalid data struct") + } + klog.V(4).InfoS("[certs] Running certs task", "virtual cluster", klog.KObj(data)) + return nil +} + +func skipCerts(d workflow.RunData) (bool, error) { + data, ok := d.(InitData) + if !ok { + return false, errors.New("certs task invoked with an invalid data struct") + } + + secretName := fmt.Sprintf("%s-%s", data.GetName(), "cert") + secret, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(), secretName, metav1.GetOptions{}) + if err != nil { + return false, nil + } + + if err := data.LoadCertFromSecret(secret); err != nil { + return false, err + } + + klog.V(4).InfoS("[certs] Successfully loaded certs form secret", "secret", secret.Name, "virtual cluster", klog.KObj(data)) + klog.V(2).InfoS("[certs] Skip certs task, found previous certificates in secret", "virtual cluster", klog.KObj(data)) + return true, nil +} + +func newCertSubTasks() []workflow.Task { + var subTasks []workflow.Task + caCert := map[string]*cert.CertConfig{} + + for _, cert := range cert.GetDefaultCertList() { + var task workflow.Task + if cert.CAName == "" { + task = workflow.Task{Name: cert.Name, Run: runCATask(cert)} + caCert[cert.Name] = cert + } else { + task = workflow.Task{Name: cert.Name, Run: runCertTask(cert, caCert[cert.CAName])} + } + + subTasks = append(subTasks, task) + } + + return subTasks +} + +func runCertTask(cc, caCert *cert.CertConfig) func(d workflow.RunData) error { + return func(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return fmt.Errorf("certs task invoked with an invalid data struct") + } + + if caCert == nil { + return fmt.Errorf("unexpected empty ca cert for %s", cc.Name) + } + + if cc.CAName != caCert.Name { + return fmt.Errorf("expected CAname for %s, but was %s", cc.CAName, cc.Name) + } + + if err := mutateCertConfig(data, cc); err != nil { + return fmt.Errorf("error when mutate cert altNames for %s, err: %w", cc.Name, err) + } + + caCert := data.GetCert(cc.CAName) + cert, err := cert.CreateCertAndKeyFilesWithCA(cc, caCert.CertData(), caCert.KeyData()) + if err != nil { + return err + } + + data.AddCert(cert) + + klog.V(2).InfoS("[certs] Successfully generated certificate", "certName", cc.Name, "caName", cc.CAName) + return nil + } +} + +func runCATask(kc *cert.CertConfig) func(d workflow.RunData) error { + return func(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("certs task invoked with an invalid data struct") + } + + if kc.CAName != "" { + return fmt.Errorf("this function should only be used for CAs, but cert %s has CA %s", kc.Name, kc.CAName) + } + klog.V(4).InfoS("[certs] Creating a new certificate authority", "certName", kc.Name) + + cert, err := cert.NewCertificateAuthority(kc) + if err != nil { + return err + } + + klog.V(2).InfoS("[certs] Successfully generated ca certificate", "certName", kc.Name) + + data.AddCert(cert) + return nil + } +} + +func mutateCertConfig(data InitData, cc *cert.CertConfig) error { + if cc.AltNamesMutatorFunc != nil { + err := cc.AltNamesMutatorFunc(&cert.AltNamesMutatorConfig{ + Name: data.GetName(), + Namespace: data.GetNamespace(), + ControlplaneAddr: data.ControlplaneAddress(), + ClusterIps: data.ServiceClusterIp(), + }, cc) + if err != nil { + return err + } + } + + return nil +} diff --git a/pkg/kubenest/tasks/check.go b/pkg/kubenest/tasks/check.go new file mode 100644 index 000000000..3363b6bc4 --- /dev/null +++ b/pkg/kubenest/tasks/check.go @@ -0,0 +1,87 @@ +package tasks + +import ( + "errors" + "fmt" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + apiclient "github.com/kosmos.io/kosmos/pkg/kubenest/util/api-client" + "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" +) + +var ( + kubeControllerManagerLabels = labels.Set{"virtualCluster-app": constants.KubeControllerManager} + virtualClusterManagerLabels = labels.Set{"virtualCluster-app": constants.VirtualClusterScheduler} +) + +func NewCheckApiserverHealthTask() workflow.Task { + return workflow.Task{ + Name: "check-apiserver-health", + Run: runCheckApiserver, + } +} + +func NewCheckControlPlaneTask() workflow.Task { + return workflow.Task{ + Name: "check-controlPlane-health", + Run: runCheckControlPlane, + RunSubTasks: true, + Tasks: []workflow.Task{ + newCheckControlPlaneSubTask("KubeControllerManager", kubeControllerManagerLabels), + newCheckControlPlaneSubTask("VirtualClusterScheduler", virtualClusterManagerLabels), + }, + } +} + +func newCheckControlPlaneSubTask(component string, ls labels.Set) workflow.Task { + return workflow.Task{ + Name: component, + Run: runCheckControlPlaneSubTask(component, ls), + } +} + +func runCheckApiserver(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return fmt.Errorf("check-apiserver-health task invoked with an invalid data struct") + } + klog.V(4).InfoS("[check-apiserver-health] Running task", "virtual cluster", klog.KObj(data)) + + checker := apiclient.NewVirtualClusterChecker(data.RemoteClient(), constants.ComponentBeReadyTimeout) + + if err := apiclient.TryRunCommand(checker.WaitForAPI, 3); err != nil { + return fmt.Errorf("the virtual cluster apiserver is unhealthy, err: %w", err) + } + klog.V(2).InfoS("[check-apiserver-health] the etcd and virtualCluster-apiserver is healthy", "virtual cluster", klog.KObj(data)) + return nil +} + +func runCheckControlPlane(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("check-controlPlane task invoked with an invalid data struct") + } + + klog.V(4).InfoS("[check-controlPlane] Running wait-controlPlane task", "virtual cluster", klog.KObj(data)) + return nil +} + +func runCheckControlPlaneSubTask(component string, ls labels.Set) func(r workflow.RunData) error { + return func(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("check-controlPlane task invoked with an invalid data struct") + } + + checker := apiclient.NewVirtualClusterChecker(data.RemoteClient(), constants.ComponentBeReadyTimeout) + if err := checker.WaitForSomePods(ls.String(), data.GetNamespace(), 1); err != nil { + return fmt.Errorf("checking for %s to ready timeout, err: %w", component, err) + } + + klog.V(2).InfoS("[check-ControlPlane] component status is ready", "component", component, "virtual cluster", klog.KObj(data)) + return nil + } +} diff --git a/pkg/kubenest/tasks/component.go b/pkg/kubenest/tasks/component.go new file mode 100644 index 000000000..57c1253e1 --- /dev/null +++ b/pkg/kubenest/tasks/component.go @@ -0,0 +1,63 @@ +package tasks + +import ( + "errors" + "fmt" + + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/controlplane" + "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" +) + +func NewComponentTask() workflow.Task { + return workflow.Task{ + Name: "components", + Run: runComponents, + RunSubTasks: true, + Tasks: []workflow.Task{ + newComponentSubTask(constants.KubeControllerManagerComponent), + newComponentSubTask(constants.VirtualClusterSchedulerComponent), + }, + } +} + +func runComponents(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("components task invoked with an invalid data struct") + } + + klog.V(4).InfoS("[components] Running components task", "virtual cluster", klog.KObj(data)) + return nil +} + +func newComponentSubTask(component string) workflow.Task { + return workflow.Task{ + Name: component, + Run: runComponentSubTask(component), + } +} + +func runComponentSubTask(component string) func(r workflow.RunData) error { + return func(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("components task invoked with an invalid data struct") + } + + err := controlplane.EnsureControlPlaneComponent( + component, + data.GetName(), + data.GetNamespace(), + data.RemoteClient(), + ) + if err != nil { + return fmt.Errorf("failed to apply component %s, err: %w", component, err) + } + + klog.V(2).InfoS("[components] Successfully applied component", "component", component, "virtual cluster", klog.KObj(data)) + return nil + } +} diff --git a/pkg/kubenest/tasks/data.go b/pkg/kubenest/tasks/data.go new file mode 100644 index 000000000..b813d9352 --- /dev/null +++ b/pkg/kubenest/tasks/data.go @@ -0,0 +1,20 @@ +package tasks + +import ( + clientset "k8s.io/client-go/kubernetes" + + "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" + "github.com/kosmos.io/kosmos/pkg/kubenest/util/cert" +) + +type InitData interface { + cert.CertStore + GetName() string + GetNamespace() string + ControlplaneAddress() string + ServiceClusterIp() []string + RemoteClient() clientset.Interface + KosmosClient() versioned.Interface + DataDir() string + VirtualClusterVersion() string +} diff --git a/pkg/kubenest/tasks/etcd.go b/pkg/kubenest/tasks/etcd.go new file mode 100644 index 000000000..6d6398226 --- /dev/null +++ b/pkg/kubenest/tasks/etcd.go @@ -0,0 +1,77 @@ +package tasks + +import ( + "errors" + "fmt" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/controlplane" + apiclient "github.com/kosmos.io/kosmos/pkg/kubenest/util/api-client" + "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" +) + +var ( + etcdLabels = labels.Set{constants.Label: constants.Etcd} +) + +func NewEtcdTask() workflow.Task { + return workflow.Task{ + Name: "Etcd", + Run: runEtcd, + RunSubTasks: true, + Tasks: []workflow.Task{ + { + Name: "deploy-etcd", + Run: runDeployEtcd, + }, + { + Name: "check-etcd", + Run: runCheckEtcd, + }, + }, + } +} + +func runEtcd(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("etcd task invoked with an invalid data struct") + } + + klog.V(4).InfoS("[etcd] Running etcd task", "virtual cluster", klog.KObj(data)) + return nil +} + +func runDeployEtcd(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("deploy-etcd task invoked with an invalid data struct") + } + + err := controlplane.EnsureVirtualClusterEtcd(data.RemoteClient(), data.GetName(), data.GetNamespace()) + if err != nil { + return fmt.Errorf("failed to install etcd component, err: %w", err) + } + + klog.V(2).InfoS("[deploy-etcd] Successfully installed etcd component", "virtual cluster", klog.KObj(data)) + return nil +} + +func runCheckEtcd(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("check-etcd task invoked with an invalid data struct") + } + + checker := apiclient.NewVirtualClusterChecker(data.RemoteClient(), constants.ComponentBeReadyTimeout) + + if err := checker.WaitForSomePods(etcdLabels.String(), data.GetNamespace(), 1); err != nil { + return fmt.Errorf("checking for virtual cluster etcd to ready timeout, err: %w", err) + } + + klog.V(2).InfoS("[check-etcd] the etcd pods is ready", "virtual cluster", klog.KObj(data)) + return nil +} diff --git a/pkg/kubenest/tasks/service.go b/pkg/kubenest/tasks/service.go new file mode 100644 index 000000000..82fc655fd --- /dev/null +++ b/pkg/kubenest/tasks/service.go @@ -0,0 +1,54 @@ +package tasks + +import ( + "errors" + "fmt" + + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/controlplane" + "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" +) + +func NewVirtualClusterServiceTask() workflow.Task { + return workflow.Task{ + Name: "virtual-service", + Run: runService, + RunSubTasks: true, + Tasks: []workflow.Task{ + { + Name: "virtual-service", + Run: runVirtualClusterService, + }, + }, + } +} + +func runService(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("service task invoked with an invalid data struct") + } + + klog.V(4).InfoS("[service] Running service task", "virtual cluster", klog.KObj(data)) + return nil +} + +func runVirtualClusterService(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("Virtual service task invoked with an invalid data struct") + } + + err := controlplane.EnsureVirtualClusterService( + data.RemoteClient(), + data.GetName(), + data.GetNamespace(), + ) + if err != nil { + return fmt.Errorf("failed to install virtual cluster service , err: %w", err) + } + + klog.V(2).InfoS("[Virtual Cluster Service] Successfully installed virtual cluster service", "virtual cluster", klog.KObj(data)) + return nil +} diff --git a/pkg/kubenest/tasks/upload.go b/pkg/kubenest/tasks/upload.go new file mode 100644 index 000000000..1b3280aaf --- /dev/null +++ b/pkg/kubenest/tasks/upload.go @@ -0,0 +1,240 @@ +package tasks + +import ( + "context" + "errors" + "fmt" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" + "github.com/kosmos.io/kosmos/pkg/kubenest/util/cert" + "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" +) + +var ( + VirtualClusterControllerLabel = labels.Set{constants.VirtualClusterLabelKeyName: constants.VirtualClusterController} +) + +func NewUploadCertsTask() workflow.Task { + return workflow.Task{ + Name: "Upload-Certs", + Run: runUploadCerts, + RunSubTasks: true, + Tasks: []workflow.Task{ + { + Name: "Upload-VirtualClusterCert", + Run: runUploadVirtualClusterCert, + }, + { + Name: "Upload-EtcdCert", + Run: runUploadEtcdCert, + }, + }, + } +} + +func NewUploadKubeconfigTask() workflow.Task { + return workflow.Task{ + Name: "upload-config", + RunSubTasks: true, + Run: runUploadKubeconfig, + Tasks: []workflow.Task{ + { + Name: "UploadAdminKubeconfig", + Run: runUploadAdminKubeconfig, + }, + }, + } +} + +func runUploadCerts(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("upload-certs task invoked with an invalid data struct") + } + klog.V(4).InfoS("[upload-certs] Running upload-certs task", "virtual cluster", klog.KObj(data)) + + if len(data.CertList()) == 0 { + return errors.New("there is no certs in store, please reload certs to store") + } + return nil +} + +func runUploadKubeconfig(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("upload-config task invoked with an invalid data struct") + } + + klog.V(4).InfoS("[upload-config] Running task", "virtual cluster", klog.KObj(data)) + return nil +} + +func runUploadVirtualClusterCert(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("upload-VirtualClusterCert task invoked with an invalid data struct") + } + + certList := data.CertList() + certsData := make(map[string][]byte, len(certList)) + for _, c := range certList { + certsData[c.KeyName()] = c.KeyData() + certsData[c.CertName()] = c.CertData() + } + + err := createOrUpdateSecret(data.RemoteClient(), &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", data.GetName(), "cert"), + Namespace: data.GetNamespace(), + Labels: VirtualClusterControllerLabel, + }, + Data: certsData, + }) + if err != nil { + return fmt.Errorf("failed to upload virtual cluster cert to secret, err: %w", err) + } + + klog.V(2).InfoS("[upload-VirtualClusterCert] Successfully uploaded virtual cluster certs to secret", "virtual cluster", klog.KObj(data)) + return nil +} + +func runUploadEtcdCert(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("upload-etcdCert task invoked with an invalid data struct") + } + + ca := data.GetCert(constants.EtcdCaCertAndKeyName) + server := data.GetCert(constants.EtcdServerCertAndKeyName) + client := data.GetCert(constants.EtcdClientCertAndKeyName) + + err := createOrUpdateSecret(data.RemoteClient(), &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: data.GetNamespace(), + Name: fmt.Sprintf("%s-%s", data.GetName(), "etcd-cert"), + Labels: VirtualClusterControllerLabel, + }, + + Data: map[string][]byte{ + ca.CertName(): ca.CertData(), + ca.KeyName(): ca.KeyData(), + server.CertName(): server.CertData(), + server.KeyName(): server.KeyData(), + client.CertName(): client.CertData(), + client.KeyName(): client.KeyData(), + }, + }) + if err != nil { + return fmt.Errorf("failed to upload etcd certs to secret, err: %w", err) + } + + klog.V(2).InfoS("[upload-etcdCert] Successfully uploaded etcd certs to secret", "virtual cluster", klog.KObj(data)) + return nil +} + +func createOrUpdateSecret(client clientset.Interface, secret *corev1.Secret) error { + _, err := client.CoreV1().Secrets(secret.GetNamespace()).Create(context.TODO(), secret, metav1.CreateOptions{}) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return err + } + + _, err := client.CoreV1().Secrets(secret.GetNamespace()).Update(context.TODO(), secret, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + + klog.V(5).InfoS("Successfully created or updated secret", "secret", secret.GetName()) + return nil +} + +func runUploadAdminKubeconfig(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("UploadAdminKubeconfig task invoked with an invalid data struct") + } + + var endpoint string + service, err := data.RemoteClient().CoreV1().Services(data.GetNamespace()).Get(context.TODO(), fmt.Sprintf("%s-%s", data.GetName(), "apiserver"), metav1.GetOptions{}) + if err != nil { + return err + } + nodePort := getNodePortFromAPIServerService(service) + endpoint = fmt.Sprintf("https://%s:%d", data.ControlplaneAddress(), nodePort) + + kubeconfig, err := buildKubeConfigFromSpec(data, endpoint) + if err != nil { + return err + } + + configBytes, err := clientcmd.Write(*kubeconfig) + if err != nil { + return err + } + + err = createOrUpdateSecret(data.RemoteClient(), &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: data.GetNamespace(), + Name: fmt.Sprintf("%s-%s", data.GetName(), "admin-config"), + Labels: VirtualClusterControllerLabel, + }, + Data: map[string][]byte{"kubeconfig": configBytes}, + }) + if err != nil { + return fmt.Errorf("failed to create secret of kubeconfig, err: %w", err) + } + + klog.V(2).InfoS("[UploadAdminKubeconfig] Successfully created secret of virtual cluster apiserver kubeconfig", "virtual cluster", klog.KObj(data)) + return nil +} + +func getNodePortFromAPIServerService(service *corev1.Service) int32 { + var nodePort int32 + if service.Spec.Type == corev1.ServiceTypeNodePort { + for _, port := range service.Spec.Ports { + if port.Name != "client" { + continue + } + nodePort = port.NodePort + } + } + + return nodePort +} + +func buildKubeConfigFromSpec(data InitData, serverURL string) (*clientcmdapi.Config, error) { + ca := data.GetCert(constants.CaCertAndKeyName) + if ca == nil { + return nil, errors.New("unable build virtual cluster admin kubeconfig, CA cert is empty") + } + + cc := cert.VirtualClusterCertClient() + + if err := mutateCertConfig(data, cc); err != nil { + return nil, fmt.Errorf("error when mutate cert altNames for %s, err: %w", cc.Name, err) + } + client, err := cert.CreateCertAndKeyFilesWithCA(cc, ca.CertData(), ca.KeyData()) + if err != nil { + return nil, fmt.Errorf("failed to generate virtual cluster apiserver client certificate for kubeconfig, err: %w", err) + } + + return util.CreateWithCerts( + serverURL, + constants.ClusterName, + constants.UserName, + ca.CertData(), + client.KeyData(), + client.CertData(), + ), nil +} diff --git a/pkg/kubenest/util/address.go b/pkg/kubenest/util/address.go new file mode 100644 index 000000000..2061a6e14 --- /dev/null +++ b/pkg/kubenest/util/address.go @@ -0,0 +1,84 @@ +package util + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + clientset "k8s.io/client-go/kubernetes" + netutils "k8s.io/utils/net" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" +) + +func GetAPIServiceIP(clientset clientset.Interface) (string, error) { + nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil || len(nodes.Items) == 0 { + return "", fmt.Errorf("there are no nodes in cluster, err: %w", err) + } + + var ( + masterLabel = labels.Set{"node-role.kubernetes.io/master": ""} + controlplaneLabel = labels.Set{"node-role.kubernetes.io/control-plane": ""} + ) + // first, select the master node as the IP of APIServer. if there is + // no master nodes, randomly select a worker node. + for _, node := range nodes.Items { + ls := labels.Set(node.GetLabels()) + + if masterLabel.AsSelector().Matches(ls) || controlplaneLabel.AsSelector().Matches(ls) { + if ip := netutils.ParseIPSloppy(node.Status.Addresses[0].Address); ip != nil { + return ip.String(), nil + } + } + } + return nodes.Items[0].Status.Addresses[0].Address, nil +} + +func GetAPIServiceClusterIp(namespace string, client clientset.Interface) (error, string) { + serviceLists, err := client.CoreV1().Services(namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return err, "" + } + if serviceLists != nil { + for _, service := range serviceLists.Items { + if service.Spec.Type == constants.ServiceType { + return nil, service.Spec.ClusterIP + } + } + } + return nil, "" +} + +func GetServiceClusterIp(namespace string, client clientset.Interface) (error, []string) { + serviceLists, err := client.CoreV1().Services(namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return err, nil + } + var clusterIps []string + if serviceLists != nil { + for _, service := range serviceLists.Items { + if service.Spec.ClusterIP != "" { + clusterIps = append(clusterIps, service.Spec.ClusterIP) + } + } + } + return nil, clusterIps +} + +func GetEtcdServiceClusterIp(namespace string, client clientset.Interface) (error, []string) { + serviceLists, err := client.CoreV1().Services(namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return err, nil + } + var clusterIps []string + if serviceLists != nil { + for _, service := range serviceLists.Items { + if service.Spec.Type == constants.EtcdServiceType && service.Spec.ClusterIP != "" { + clusterIps = append(clusterIps, service.Spec.ClusterIP) + } + } + } + return nil, clusterIps +} diff --git a/pkg/kubenest/util/api-client/check.go b/pkg/kubenest/util/api-client/check.go new file mode 100644 index 000000000..8d4879b55 --- /dev/null +++ b/pkg/kubenest/util/api-client/check.go @@ -0,0 +1,94 @@ +package util + +import ( + "context" + "net/http" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" +) + +type Checker interface { + WaitForAPI() error + WaitForSomePods(label, namespace string, podNum int32) error +} + +type VirtualClusterChecker struct { + client clientset.Interface + timeout time.Duration +} + +func NewVirtualClusterChecker(client clientset.Interface, timeout time.Duration) Checker { + return &VirtualClusterChecker{ + client: client, + timeout: timeout, + } +} + +func (v *VirtualClusterChecker) WaitForSomePods(label, namespace string, podNum int32) error { + return wait.PollImmediate(constants.ApiServerCallRetryInterval, v.timeout, func() (bool, error) { + listOpts := metav1.ListOptions{LabelSelector: label} + pods, err := v.client.CoreV1().Pods(namespace).List(context.TODO(), listOpts) + if err != nil { + return false, nil + } + + if len(pods.Items) == 0 { + return false, nil + } + + var expected int32 + for _, pod := range pods.Items { + if isPodRunning(pod) { + expected++ + } + } + return expected >= podNum, nil + }) +} +func (w *VirtualClusterChecker) WaitForAPI() error { + return wait.PollImmediate(constants.ApiServerCallRetryInterval, w.timeout, func() (bool, error) { + healthStatus := 0 + w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&healthStatus) + if healthStatus != http.StatusOK { + return false, nil + } + + return true, nil + }) +} + +func TryRunCommand(f func() error, failureThreshold int) error { + backoff := wait.Backoff{ + Duration: 5 * time.Second, + Factor: 2, // double the timeout for every failure + Steps: failureThreshold, + } + return wait.ExponentialBackoff(backoff, func() (bool, error) { + err := f() + if err != nil { + // Retry until the timeout + return false, nil + } + // The last f() call was a success, return cleanly + return true, nil + }) +} + +func isPodRunning(pod corev1.Pod) bool { + if pod.Status.Phase != corev1.PodRunning || pod.DeletionTimestamp != nil { + return false + } + + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue { + return true + } + } + return false +} diff --git a/pkg/kubenest/util/cert/certs.go b/pkg/kubenest/util/cert/certs.go new file mode 100644 index 000000000..589e9847e --- /dev/null +++ b/pkg/kubenest/util/cert/certs.go @@ -0,0 +1,450 @@ +package cert + +import ( + "crypto" + "crypto/ecdsa" + "crypto/elliptic" + cryptorand "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "errors" + "fmt" + "math" + "math/big" + "net" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/validation" + certutil "k8s.io/client-go/util/cert" + "k8s.io/client-go/util/keyutil" + netutils "k8s.io/utils/net" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" +) + +type CertConfig struct { + Name string + CAName string + NotAfter *time.Time + PublicKeyAlgorithm x509.PublicKeyAlgorithm + Config certutil.Config + AltNamesMutatorFunc altNamesMutatorFunc +} + +type altNamesMutatorFunc func(*AltNamesMutatorConfig, *CertConfig) error + +type AltNamesMutatorConfig struct { + Name string + Namespace string + ControlplaneAddr string + ClusterIps []string +} + +func (config *CertConfig) defaultPublicKeyAlgorithm() { + if config.PublicKeyAlgorithm == x509.UnknownPublicKeyAlgorithm { + config.PublicKeyAlgorithm = x509.RSA + } +} + +func (config *CertConfig) defaultNotAfter() { + if config.NotAfter == nil { + notAfter := time.Now().Add(constants.CertificateValidity).UTC() + config.NotAfter = ¬After + } +} + +func GetDefaultCertList() []*CertConfig { + return []*CertConfig{ + // virtual cluster cert config. + VirtualClusterCertRootCA(), + VirtualClusterCertAdmin(), + VirtualClusterCertApiserver(), + // front proxy cert config. + VirtualClusterCertFrontProxyCA(), + VirtualClusterFrontProxyClient(), + // ETCD cert config. + VirtualClusterCertEtcdCA(), + VirtualClusterCertEtcdServer(), + VirtualClusterCertEtcdClient(), + } +} + +func VirtualClusterCertEtcdCA() *CertConfig { + return &CertConfig{ + Name: constants.EtcdCaCertAndKeyName, + Config: certutil.Config{ + CommonName: "virtualcluster-etcd-ca", + }, + } +} + +func VirtualClusterCertEtcdServer() *CertConfig { + return &CertConfig{ + Name: constants.EtcdServerCertAndKeyName, + CAName: constants.EtcdCaCertAndKeyName, + Config: certutil.Config{ + CommonName: "virtualCluster-etcd-server", + Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, + }, + AltNamesMutatorFunc: makeAltNamesMutator(etcdServerAltNamesMutator), + } +} + +func VirtualClusterCertEtcdClient() *CertConfig { + return &CertConfig{ + Name: constants.EtcdClientCertAndKeyName, + CAName: constants.EtcdCaCertAndKeyName, + Config: certutil.Config{ + CommonName: "virtualCluster-etcd-client", + Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, + }, + } +} + +func VirtualClusterCertFrontProxyCA() *CertConfig { + return &CertConfig{ + Name: constants.FrontProxyCaCertAndKeyName, + Config: certutil.Config{ + CommonName: "front-proxy-ca", + }, + } +} + +func VirtualClusterFrontProxyClient() *CertConfig { + return &CertConfig{ + Name: constants.FrontProxyClientCertAndKeyName, + CAName: constants.FrontProxyCaCertAndKeyName, + Config: certutil.Config{ + CommonName: "front-proxy-client", + Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, + }, + } +} + +func etcdServerAltNamesMutator(cfg *AltNamesMutatorConfig) (*certutil.AltNames, error) { + etcdClientServiceDNS := fmt.Sprintf("%s.%s.svc.cluster.local", fmt.Sprintf("%s-%s", cfg.Name, "etcd-client"), cfg.Namespace) + etcdPeerServiceDNS := fmt.Sprintf("*.%s.%s.svc.cluster.local", fmt.Sprintf("%s-%s", cfg.Name, "etcd"), cfg.Namespace) + + altNames := &certutil.AltNames{ + DNSNames: []string{"localhost", etcdClientServiceDNS, etcdPeerServiceDNS}, + IPs: []net.IP{net.IPv4(127, 0, 0, 1)}, + } + + if len(cfg.ClusterIps) > 0 { + for _, clusterIp := range cfg.ClusterIps { + appendSANsToAltNames(altNames, []string{clusterIp}) + } + } + return altNames, nil +} + +func VirtualClusterCertApiserver() *CertConfig { + return &CertConfig{ + Name: constants.ApiserverCertAndKeyName, + CAName: constants.CaCertAndKeyName, + Config: certutil.Config{ + CommonName: "virtualCluster-apiserver", + Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + }, + AltNamesMutatorFunc: makeAltNamesMutator(apiServerAltNamesMutator), + } +} + +func VirtualClusterCertRootCA() *CertConfig { + return &CertConfig{ + Name: constants.CaCertAndKeyName, + Config: certutil.Config{ + CommonName: "virtualCluster", + }, + } +} + +func VirtualClusterCertAdmin() *CertConfig { + return &CertConfig{ + Name: constants.VirtualClusterCertAndKeyName, + CAName: constants.CaCertAndKeyName, + Config: certutil.Config{ + CommonName: "system:admin", + Organization: []string{"system:masters"}, + Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, + }, + AltNamesMutatorFunc: makeAltNamesMutator(apiServerAltNamesMutator), + } +} + +func makeAltNamesMutator(f func(cfg *AltNamesMutatorConfig) (*certutil.AltNames, error)) altNamesMutatorFunc { + return func(cfg *AltNamesMutatorConfig, cc *CertConfig) error { + altNames, err := f(cfg) + if err != nil { + return err + } + + cc.Config.AltNames = *altNames + return nil + } +} + +func apiServerAltNamesMutator(cfg *AltNamesMutatorConfig) (*certutil.AltNames, error) { + altNames := &certutil.AltNames{ + DNSNames: []string{ + "localhost", + "kubernetes", + "kubernetes.default", + "kubernetes.default.svc", + fmt.Sprintf("*.%s.svc.cluster.local", constants.VirtualClusterSystemNamespace), + fmt.Sprintf("*.%s.svc", constants.VirtualClusterSystemNamespace), + }, + //TODO (考虑节点属于当前集群节点和非当前集群节点情况) + IPs: []net.IP{ + net.IPv4(127, 0, 0, 1), + net.IPv4(100, 71, 7, 52), + net.IPv4(10, 237, 6, 17), + }, + } + + if cfg.Namespace != constants.VirtualClusterSystemNamespace { + appendSANsToAltNames(altNames, []string{fmt.Sprintf("*.%s.svc.cluster.local", cfg.Namespace), + fmt.Sprintf("*.%s.svc", cfg.Namespace)}) + } + if len(cfg.ControlplaneAddr) > 0 { + appendSANsToAltNames(altNames, []string{cfg.ControlplaneAddr}) + } + if len(cfg.ClusterIps) > 0 { + for _, clusterIp := range cfg.ClusterIps { + appendSANsToAltNames(altNames, []string{clusterIp}) + } + } + return altNames, nil +} + +func appendSANsToAltNames(altNames *certutil.AltNames, SANs []string) { + for _, altname := range SANs { + if ip := netutils.ParseIPSloppy(altname); ip != nil { + altNames.IPs = append(altNames.IPs, ip) + } else if len(validation.IsDNS1123Subdomain(altname)) == 0 { + altNames.DNSNames = append(altNames.DNSNames, altname) + } else if len(validation.IsWildcardDNS1123Subdomain(altname)) == 0 { + altNames.DNSNames = append(altNames.DNSNames, altname) + } + } +} + +type VirtualClusterCert struct { + pairName string + caName string + cert []byte + key []byte +} + +// CertData returns certificate cert data. +func (cert *VirtualClusterCert) CertData() []byte { + return cert.cert +} + +// KeyData returns certificate key data. +func (cert *VirtualClusterCert) KeyData() []byte { + return cert.key +} + +// CertName returns cert file name. its default suffix is ".crt". +func (cert *VirtualClusterCert) CertName() string { + pair := cert.pairName + if len(pair) == 0 { + pair = "cert" + } + return pair + constants.CertExtension +} + +// KeyName returns cert key file name. its default suffix is ".key". +func (cert *VirtualClusterCert) KeyName() string { + pair := cert.pairName + if len(pair) == 0 { + pair = "cert" + } + return pair + constants.KeyExtension +} + +func NewCertificateAuthority(cc *CertConfig) (*VirtualClusterCert, error) { + cc.defaultPublicKeyAlgorithm() + + key, err := GeneratePrivateKey(cc.PublicKeyAlgorithm) + if err != nil { + return nil, fmt.Errorf("unable to create private key while generating CA certificate, err: %w", err) + } + + cert, err := certutil.NewSelfSignedCACert(cc.Config, key) + if err != nil { + return nil, fmt.Errorf("unable to create self-signed CA certificate, err: %w", err) + } + + encoded, err := keyutil.MarshalPrivateKeyToPEM(key) + if err != nil { + return nil, fmt.Errorf("unable to marshal private key to PEM, err: %w", err) + } + + return &VirtualClusterCert{ + pairName: cc.Name, + caName: cc.CAName, + cert: EncodeCertPEM(cert), + key: encoded, + }, nil +} + +func CreateCertAndKeyFilesWithCA(cc *CertConfig, caCertData, caKeyData []byte) (*VirtualClusterCert, error) { + if len(cc.Config.Usages) == 0 { + return nil, fmt.Errorf("must specify at least one ExtKeyUsage") + } + + cc.defaultNotAfter() + cc.defaultPublicKeyAlgorithm() + + key, err := GeneratePrivateKey(cc.PublicKeyAlgorithm) + if err != nil { + return nil, fmt.Errorf("unable to create private key, err: %w", err) + } + + caCerts, err := certutil.ParseCertsPEM(caCertData) + if err != nil { + return nil, err + } + + caKey, err := ParsePrivateKeyPEM(caKeyData) + if err != nil { + return nil, err + } + + // Safely pick the first one because the sender's certificate must come first in the list. + // For details, see: https://www.rfc-editor.org/rfc/rfc4346#section-7.4.2 + caCert := caCerts[0] + + cert, err := NewSignedCert(cc, key, caCert, caKey, false) + if err != nil { + return nil, err + } + + encoded, err := keyutil.MarshalPrivateKeyToPEM(key) + if err != nil { + return nil, fmt.Errorf("unable to marshal private key to PEM, err: %w", err) + } + + return &VirtualClusterCert{ + pairName: cc.Name, + caName: cc.CAName, + cert: EncodeCertPEM(cert), + key: encoded, + }, nil +} + +func EncodeCertPEM(cert *x509.Certificate) []byte { + block := pem.Block{ + Type: constants.CertificateBlockType, + Bytes: cert.Raw, + } + return pem.EncodeToMemory(&block) +} + +func GeneratePrivateKey(keyType x509.PublicKeyAlgorithm) (crypto.Signer, error) { + if keyType == x509.ECDSA { + return ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader) + } + + return rsa.GenerateKey(cryptorand.Reader, constants.RsaKeySize) +} + +func ParsePrivateKeyPEM(keyData []byte) (crypto.Signer, error) { + caPrivateKey, err := keyutil.ParsePrivateKeyPEM(keyData) + if err != nil { + return nil, err + } + + // Allow RSA and ECDSA formats only + var key crypto.Signer + switch k := caPrivateKey.(type) { + case *rsa.PrivateKey: + key = k + case *ecdsa.PrivateKey: + key = k + default: + return nil, errors.New("the private key is neither in RSA nor ECDSA format") + } + + return key, nil +} + +func NewSignedCert(cc *CertConfig, key crypto.Signer, caCert *x509.Certificate, caKey crypto.Signer, isCA bool) (*x509.Certificate, error) { + serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64)) + if err != nil { + return nil, err + } + if len(cc.Config.CommonName) == 0 { + return nil, fmt.Errorf("must specify a CommonName") + } + + keyUsage := x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature + if isCA { + keyUsage |= x509.KeyUsageCertSign + } + + RemoveDuplicateAltNames(&cc.Config.AltNames) + notAfter := time.Now().Add(constants.CertificateValidity).UTC() + if cc.NotAfter != nil { + notAfter = *cc.NotAfter + } + + certTmpl := x509.Certificate{ + Subject: pkix.Name{ + CommonName: cc.Config.CommonName, + Organization: cc.Config.Organization, + }, + DNSNames: cc.Config.AltNames.DNSNames, + IPAddresses: cc.Config.AltNames.IPs, + SerialNumber: serial, + NotBefore: caCert.NotBefore, + NotAfter: notAfter, + KeyUsage: keyUsage, + ExtKeyUsage: cc.Config.Usages, + BasicConstraintsValid: true, + IsCA: isCA, + } + certDERBytes, err := x509.CreateCertificate(cryptorand.Reader, &certTmpl, caCert, key.Public(), caKey) + if err != nil { + return nil, err + } + return x509.ParseCertificate(certDERBytes) +} + +func RemoveDuplicateAltNames(altNames *certutil.AltNames) { + if altNames == nil { + return + } + + if altNames.DNSNames != nil { + altNames.DNSNames = sets.NewString(altNames.DNSNames...).List() + } + + ipsKeys := make(map[string]struct{}) + var ips []net.IP + for _, one := range altNames.IPs { + if _, ok := ipsKeys[one.String()]; !ok { + ipsKeys[one.String()] = struct{}{} + ips = append(ips, one) + } + } + altNames.IPs = ips +} + +func VirtualClusterCertClient() *CertConfig { + return &CertConfig{ + Name: "virtualCluster-client", + CAName: constants.CaCertAndKeyName, + Config: certutil.Config{ + CommonName: "system:admin", + Organization: []string{"system:masters"}, + Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, + }, + AltNamesMutatorFunc: makeAltNamesMutator(apiServerAltNamesMutator), + } +} diff --git a/pkg/kubenest/util/cert/store.go b/pkg/kubenest/util/cert/store.go new file mode 100644 index 000000000..64511ebd2 --- /dev/null +++ b/pkg/kubenest/util/cert/store.go @@ -0,0 +1,93 @@ +package cert + +import ( + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" +) + +type CertStore interface { + AddCert(cert *VirtualClusterCert) + GetCert(name string) *VirtualClusterCert + CertList() []*VirtualClusterCert + LoadCertFromSecret(secret *corev1.Secret) error +} + +type splitToPairNameFunc func(name string) string + +type VirtualClusterCertStore struct { + certs map[string]*VirtualClusterCert + pairNameFunc splitToPairNameFunc +} + +func NewCertStore() CertStore { + return &VirtualClusterCertStore{ + certs: make(map[string]*VirtualClusterCert), + pairNameFunc: SplitToPairName, + } +} + +func SplitToPairName(name string) string { + if strings.Contains(name, constants.KeyExtension) { + strArr := strings.Split(name, constants.KeyExtension) + return strArr[0] + } + + if strings.Contains(name, constants.CertExtension) { + strArr := strings.Split(name, constants.CertExtension) + return strArr[0] + } + + return name +} + +func (store *VirtualClusterCertStore) AddCert(cert *VirtualClusterCert) { + store.certs[cert.pairName] = cert +} + +func (store *VirtualClusterCertStore) GetCert(name string) *VirtualClusterCert { + for _, c := range store.certs { + if c.pairName == name { + return c + } + } + return nil +} + +func (store *VirtualClusterCertStore) CertList() []*VirtualClusterCert { + certs := make([]*VirtualClusterCert, 0, len(store.certs)) + + for _, c := range store.certs { + certs = append(certs, c) + } + + return certs +} + +func (store *VirtualClusterCertStore) LoadCertFromSecret(secret *corev1.Secret) error { + if len(secret.Data) == 0 { + return fmt.Errorf("cert data is empty") + } + + for name, data := range secret.Data { + pairName := store.pairNameFunc(name) + kc := store.GetCert(pairName) + if kc == nil { + kc = &VirtualClusterCert{ + pairName: pairName, + } + } + if strings.Contains(name, constants.CertExtension) { + kc.cert = data + } + if strings.Contains(name, constants.KeyExtension) { + kc.key = data + } + store.AddCert(kc) + } + + return nil +} diff --git a/pkg/kubenest/util/helper.go b/pkg/kubenest/util/helper.go new file mode 100644 index 000000000..ff572effb --- /dev/null +++ b/pkg/kubenest/util/helper.go @@ -0,0 +1,28 @@ +package util + +import ( + "context" + + appsv1 "k8s.io/api/apps/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +func CreateOrUpdateDeployment(client clientset.Interface, deployment *appsv1.Deployment) error { + _, err := client.AppsV1().Deployments(deployment.GetNamespace()).Create(context.TODO(), deployment, metav1.CreateOptions{}) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return err + } + + _, err := client.AppsV1().Deployments(deployment.GetNamespace()).Update(context.TODO(), deployment, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + + klog.V(5).InfoS("Successfully created or updated deployment", "deployment", deployment.GetName()) + return nil +} diff --git a/pkg/kubenest/util/image.go b/pkg/kubenest/util/image.go new file mode 100644 index 000000000..bfea9d98e --- /dev/null +++ b/pkg/kubenest/util/image.go @@ -0,0 +1,20 @@ +package util + +import ( + "os" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/utils" +) + +func GetImageMessage() (imageRepository string, imageVersion string) { + imageRepository = os.Getenv(constants.DefauleImageRepositoryEnv) + if len(imageRepository) == 0 { + imageRepository = utils.DefaultImageRepository + } + imageVersion = os.Getenv(constants.DefauleImageVersionEnv) + if len(imageVersion) == 0 { + imageVersion = utils.DefaultImageVersion + } + return imageRepository, imageVersion +} diff --git a/pkg/kubenest/util/kubeconfig.go b/pkg/kubenest/util/kubeconfig.go new file mode 100644 index 000000000..8536a9c47 --- /dev/null +++ b/pkg/kubenest/util/kubeconfig.go @@ -0,0 +1,38 @@ +package util + +import ( + "fmt" + + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" +) + +func CreateWithCerts(serverURL, clusterName, userName string, caCert []byte, clientKey []byte, clientCert []byte) *clientcmdapi.Config { + config := CreateBasic(serverURL, clusterName, userName, caCert) + config.AuthInfos[userName] = &clientcmdapi.AuthInfo{ + ClientKeyData: clientKey, + ClientCertificateData: clientCert, + } + return config +} + +func CreateBasic(serverURL, clusterName, userName string, caCert []byte) *clientcmdapi.Config { + // Use the cluster and the username as the context name + contextName := fmt.Sprintf("%s@%s", userName, clusterName) + + return &clientcmdapi.Config{ + Clusters: map[string]*clientcmdapi.Cluster{ + clusterName: { + Server: serverURL, + CertificateAuthorityData: caCert, + }, + }, + Contexts: map[string]*clientcmdapi.Context{ + contextName: { + Cluster: clusterName, + AuthInfo: userName, + }, + }, + AuthInfos: map[string]*clientcmdapi.AuthInfo{}, + CurrentContext: contextName, + } +} diff --git a/pkg/kubenest/util/template.go b/pkg/kubenest/util/template.go new file mode 100644 index 000000000..a8ba8872e --- /dev/null +++ b/pkg/kubenest/util/template.go @@ -0,0 +1,21 @@ +package util + +import ( + "bytes" + "fmt" + "text/template" +) + +// ParseTemplate validates and parses passed as argument template +func ParseTemplate(strtmpl string, obj interface{}) ([]byte, error) { + var buf bytes.Buffer + tmpl, err := template.New("template").Parse(strtmpl) + if err != nil { + return nil, fmt.Errorf("error when parsing template, err: %w", err) + } + err = tmpl.Execute(&buf, obj) + if err != nil { + return nil, fmt.Errorf("error when executing template, err: %w", err) + } + return buf.Bytes(), nil +} diff --git a/pkg/kubenest/workflow/phase.go b/pkg/kubenest/workflow/phase.go new file mode 100644 index 000000000..e3f3e74a0 --- /dev/null +++ b/pkg/kubenest/workflow/phase.go @@ -0,0 +1,99 @@ +package workflow + +import "k8s.io/klog/v2" + +type Phase struct { + Tasks []Task + runData RunData + runDataInitializer func() (RunData, error) +} + +type Task struct { + Name string + Run func(RunData) error + Skip func(RunData) (bool, error) + Tasks []Task + RunSubTasks bool +} + +type RunData = interface{} + +func NewPhase() *Phase { + return &Phase{ + Tasks: []Task{}, + } +} + +func (p *Phase) AppendTask(t Task) { + p.Tasks = append(p.Tasks, t) +} + +func (p *Phase) initData() (RunData, error) { + if p.runData == nil && p.runDataInitializer != nil { + var err error + if p.runData, err = p.runDataInitializer(); err != nil { + klog.ErrorS(err, "failed to initialize running data") + return nil, err + } + } + + return p.runData, nil +} + +func (p *Phase) SetDataInitializer(build func() (RunData, error)) { + p.runDataInitializer = build +} + +func (p *Phase) Run() error { + runData := p.runData + if runData == nil { + if _, err := p.initData(); err != nil { + return err + } + } + + for _, t := range p.Tasks { + if err := run(t, p.runData); err != nil { + return err + } + } + + return nil +} + +func (p *Phase) Init() error { + runData := p.runData + if runData == nil { + if _, err := p.initData(); err != nil { + return err + } + } + return nil +} + +func run(t Task, data RunData) error { + if t.Skip != nil { + skip, err := t.Skip(data) + if err != nil { + return err + } + if skip { + return nil + } + } + + if t.Run != nil { + if err := t.Run(data); err != nil { + return err + } + if t.RunSubTasks { + for _, p := range t.Tasks { + if err := run(p, data); err != nil { + return err + } + } + } + } + + return nil +} diff --git a/pkg/treeoperator/constants/constant.go b/pkg/treeoperator/constants/constant.go deleted file mode 100644 index e5326e047..000000000 --- a/pkg/treeoperator/constants/constant.go +++ /dev/null @@ -1,11 +0,0 @@ -package constants - -const ( - InitControllerName = "virtual-cluster-init-controller" - JoinControllerName = "virtual-cluster-join-controller" - - DefauleImageRepositoryEnv = "IMAGE_REPOSITIRY" - DefauleImageVersionEnv = "IMAGE_VERSION" - VirtualClusterStatusCompleted = "Completed" - VirtualClusterFinalizerName = "kosmos.io/virtual-cluster-finalizer" -) diff --git a/pkg/treeoperator/workflow/phase.go b/pkg/treeoperator/workflow/phase.go deleted file mode 100644 index 1af3f38ca..000000000 --- a/pkg/treeoperator/workflow/phase.go +++ /dev/null @@ -1,25 +0,0 @@ -package workflow - -type Phase struct { - Tasks []Task -} - -type Task struct { - Name string - Tasks []Task -} - -func NewPhase() *Phase { - return &Phase{ - Tasks: []Task{}, - } -} - -func (p *Phase) AppendTask(t Task) { - p.Tasks = append(p.Tasks, t) -} - -func (p *Phase) Run() error { - //TODO Get the data required for the current task and execute the task - return nil -} diff --git a/pkg/utils/constants.go b/pkg/utils/constants.go index 26a3bc5ca..2c676d53a 100644 --- a/pkg/utils/constants.go +++ b/pkg/utils/constants.go @@ -48,6 +48,7 @@ const ( DefaultNamespace = "kosmos-system" DefaultClusterName = "kosmos-control-cluster" DefaultImageRepository = "ghcr.io/kosmos-io" + DefaultImageVersion = "v1.21.5-eki.0" DefaultWaitTime = 120 RootClusterAnnotationKey = "kosmos.io/cluster-role" RootClusterAnnotationValue = "root"