From afd99785bd873c4aac78695104de5100ec3e92fb Mon Sep 17 00:00:00 2001 From: renxiangyu Date: Thu, 23 May 2024 20:28:15 +0800 Subject: [PATCH] fix: add kube-proxy in virtualcluster Signed-off-by: renxiangyu --- pkg/kubenest/constants/constant.go | 3 + .../virtualcluster_init_controller.go | 2 +- pkg/kubenest/controlplane/proxy.go | 124 +++++++++++++++ pkg/kubenest/init.go | 3 + .../controlplane/proxy/mainfests_daemonset.go | 147 +++++++++++++++++ .../controlplane/proxy/manifests_rbac.go | 11 ++ pkg/kubenest/tasks/proxy.go | 149 ++++++++++++++++++ pkg/kubenest/util/helper.go | 60 +++++++ 8 files changed, 498 insertions(+), 1 deletion(-) create mode 100644 pkg/kubenest/controlplane/proxy.go create mode 100644 pkg/kubenest/manifest/controlplane/proxy/mainfests_daemonset.go create mode 100644 pkg/kubenest/manifest/controlplane/proxy/manifests_rbac.go create mode 100644 pkg/kubenest/tasks/proxy.go diff --git a/pkg/kubenest/constants/constant.go b/pkg/kubenest/constants/constant.go index 132583bd5..8cff56058 100644 --- a/pkg/kubenest/constants/constant.go +++ b/pkg/kubenest/constants/constant.go @@ -48,6 +48,9 @@ const ( ApiServerCallRetryInterval = 100 * time.Millisecond APIServerSVCPortName = "client" + //controplane proxy + Proxy = "kube-proxy" + //controlplane etcd Etcd = "etcd" EtcdReplicas = 3 diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index 9ada6ba87..b8f0c56c9 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/pkg/errors" "gopkg.in/yaml.v3" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,6 +31,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" "github.com/kosmos.io/kosmos/pkg/kubenest/util" + "github.com/pkg/errors" ) type VirtualClusterInitController struct { diff --git a/pkg/kubenest/controlplane/proxy.go b/pkg/kubenest/controlplane/proxy.go new file mode 100644 index 000000000..459bdfac6 --- /dev/null +++ b/pkg/kubenest/controlplane/proxy.go @@ -0,0 +1,124 @@ +package controlplane + +import ( + "fmt" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/yaml" + clientset "k8s.io/client-go/kubernetes" + + "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/proxy" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" + "github.com/pkg/errors" +) + +func EnsureVirtualClusterProxy(client clientset.Interface, kubeconfigString string) error { + // install kube-proxy ds in virtual cluster + if err := installProxyDaemonSet(client); err != nil { + return fmt.Errorf("failed to install virtual cluster proxy, err: %w", err) + } + + // install kube-proxy cm in virtual cluster + if err := installProxyConfigMap(client, kubeconfigString); err != nil { + return fmt.Errorf("failed to install virtual cluster proxy, err: %w", err) + } + + // install kube-proxy sa in virtual cluster + if err := installProxySA(client); err != nil { + return fmt.Errorf("failed to install virtual cluster proxy, err: %w", err) + } + return nil +} + +func DeleteVirtualClusterProxy(client clientset.Interface) error { + daemonSetName := fmt.Sprintf("%s-%s", "kube", "proxy") + daemonSetNameSpace := fmt.Sprintf("%s-%s", "kube", "system") + if err := util.DeleteDaemonSet(client, daemonSetName, daemonSetNameSpace); err != nil { + return errors.Wrapf(err, "Failed to delete daemonSet %s/%s", daemonSetName, daemonSetNameSpace) + } + + cmName := fmt.Sprintf("%s-%s", "kube", "proxy") + cmNameSpace := fmt.Sprintf("%s-%s", "kube", "system") + if err := util.DeleteConfigmap(client, cmName, cmNameSpace); err != nil { + return errors.Wrapf(err, "Failed to delete ConfigMap %s/%s", cmName, cmNameSpace) + } + + saName := fmt.Sprintf("%s-%s", "kube", "proxy") + saNameSpace := fmt.Sprintf("%s-%s", "kube", "system") + if err := util.DeleteServiceAccount(client, saName, saNameSpace); err != nil { + return errors.Wrapf(err, "Failed to delete ServiceAccount %s/%s", saName, saNameSpace) + } + return nil +} + +func installProxyDaemonSet(client clientset.Interface) error { + imageRepository, imageVersion := util.GetImageMessage() + + proxyDaemonSetBytes, err := util.ParseTemplate(proxy.ProxyDaemonSet, struct { + DaemonSetName, Namespace, ImageRepository, Version string + }{ + DaemonSetName: fmt.Sprintf("%s-%s", "kube", "proxy"), + Namespace: fmt.Sprintf("%s-%s", "kube", "system"), + ImageRepository: imageRepository, + Version: imageVersion, + }) + if err != nil { + return fmt.Errorf("error when parsing virtual cluster proxy daemonSet template: %w", err) + } + + proxyDaemonSet := &appsv1.DaemonSet{} + if err := yaml.Unmarshal([]byte(proxyDaemonSetBytes), proxyDaemonSet); err != nil { + return fmt.Errorf("error when decoding virtual cluster proxy daemonSet: %w", err) + } + + if err := util.CreateOrUpdateDaemonSet(client, proxyDaemonSet); err != nil { + return fmt.Errorf("error when creating daemonSet for %s, err: %w", proxyDaemonSet.Name, err) + } + return nil +} + +func installProxyConfigMap(client clientset.Interface, kubeconfigString string) error { + proxyConfigMapBytes, err := util.ParseTemplate(proxy.ProxyConfigMap, struct { + ConfigMapName, Namespace, KubeProxyKubeConfig string + }{ + ConfigMapName: fmt.Sprintf("%s-%s", "kube", "proxy"), + Namespace: fmt.Sprintf("%s-%s", "kube", "system"), + KubeProxyKubeConfig: kubeconfigString, + }) + if err != nil { + return fmt.Errorf("error when parsing virtual cluster proxy configmap template: %w", err) + } + + proxyConfigMap := &corev1.ConfigMap{} + if err := yaml.Unmarshal([]byte(proxyConfigMapBytes), proxyConfigMap); err != nil { + return fmt.Errorf("error when decoding virtual cluster proxy configmap: %w", err) + } + + if err := util.CreateOrUpdateConfigMap(client, proxyConfigMap); err != nil { + return fmt.Errorf("error when creating configmap for %s, err: %w", proxyConfigMap.Name, err) + } + return nil +} + +func installProxySA(client clientset.Interface) error { + proxySABytes, err := util.ParseTemplate(proxy.ProxySA, struct { + SAName, Namespace string + }{ + SAName: fmt.Sprintf("%s-%s", "kube", "proxy"), + Namespace: fmt.Sprintf("%s-%s", "kube", "system"), + }) + if err != nil { + return fmt.Errorf("error when parsing virtual cluster proxy SA template: %w", err) + } + + proxySA := &corev1.ServiceAccount{} + if err := yaml.Unmarshal([]byte(proxySABytes), proxySA); err != nil { + return fmt.Errorf("error when decoding virtual cluster proxy SA: %w", err) + } + + if err := util.CreateOrUpdateServiceAccount(client, proxySA); err != nil { + return fmt.Errorf("error when creating SA for %s, err: %w", proxySA.Name, err) + } + return nil +} diff --git a/pkg/kubenest/init.go b/pkg/kubenest/init.go index 727af0b27..a6cd6405d 100644 --- a/pkg/kubenest/init.go +++ b/pkg/kubenest/init.go @@ -63,6 +63,8 @@ func NewInitPhase(opts *InitOptions) *workflow.Phase { initPhase.AppendTask(tasks.NewAnpTask()) initPhase.AppendTask(tasks.NewComponentTask()) initPhase.AppendTask(tasks.NewCheckControlPlaneTask()) + // create proxy + initPhase.AppendTask(tasks.NewVirtualClusterProxyTask()) // create core-dns initPhase.AppendTask(tasks.NewCoreDNSTask()) // add server @@ -84,6 +86,7 @@ func UninstallPhase(opts *InitOptions) *workflow.Phase { destroyPhase.AppendTask(tasks.UninstallVirtualClusterServiceTask()) destroyPhase.AppendTask(tasks.UninstallCertsAndKubeconfigTask()) destroyPhase.AppendTask(tasks.DeleteEtcdPvcTask()) + destroyPhase.AppendTask(tasks.UninstallVirtualClusterProxyTask()) destroyPhase.SetDataInitializer(func() (workflow.RunData, error) { return newRunData(opts) diff --git a/pkg/kubenest/manifest/controlplane/proxy/mainfests_daemonset.go b/pkg/kubenest/manifest/controlplane/proxy/mainfests_daemonset.go new file mode 100644 index 000000000..d3e82aab9 --- /dev/null +++ b/pkg/kubenest/manifest/controlplane/proxy/mainfests_daemonset.go @@ -0,0 +1,147 @@ +package proxy + +const ( + ProxyDaemonSet = ` +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: {{ .DaemonSetName }} + namespace: {{ .Namespace }} + labels: + virtualCluster-app: kube-proxy + app.kubernetes.io/managed-by: virtual-cluster-controller +spec: + revisionHistoryLimit: 10 + selector: + matchLabels: + app.kubernetes.io/managed-by: virtual-cluster-controller + template: + metadata: + labels: + app.kubernetes.io/managed-by: virtual-cluster-controller + spec: + containers: + - command: + - /usr/local/bin/kube-proxy + - --config=/var/lib/kube-proxy/config.conf + - --hostname-override=$(NODE_NAME) + env: + - name: NODE_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: spec.nodeName + image: {{ .ImageRepository }}/kube-proxy:{{ .Version }} + imagePullPolicy: IfNotPresent + name: kube-proxy + resources: {} + securityContext: + privileged: true + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + volumeMounts: + - mountPath: /var/lib/kube-proxy + name: kube-proxy + - mountPath: /run/xtables.lock + name: xtables-lock + - mountPath: /lib/modules + name: lib-modules + readOnly: true + dnsPolicy: ClusterFirst + hostNetwork: true + nodeSelector: + kubernetes.io/os: linux + priorityClassName: system-node-critical + restartPolicy: Always + schedulerName: default-scheduler + securityContext: {} + serviceAccount: kube-proxy + serviceAccountName: kube-proxy + terminationGracePeriodSeconds: 30 + tolerations: + - operator: Exists + volumes: + - configMap: + defaultMode: 420 + name: kube-proxy + name: kube-proxy + - hostPath: + path: /run/xtables.lock + type: FileOrCreate + name: xtables-lock + - hostPath: + path: /lib/modules + type: "" + name: lib-modules + updateStrategy: + rollingUpdate: + maxSurge: 0 + maxUnavailable: 1 + type: RollingUpdate +` + ProxyConfigMap = ` +apiVersion: v1 +data: + config.conf: |- + apiVersion: kubeproxy.config.k8s.io/v1alpha1 + bindAddress: 0.0.0.0 + bindAddressHardFail: false + clientConnection: + acceptContentTypes: "" + burst: 100 + contentType: "" + kubeconfig: /var/lib/kube-proxy/kubeconfig.conf + qps: 100 + clusterCIDR: 172.19.0.0/16,fd22:2222:2222::/48 + configSyncPeriod: 0s + conntrack: + maxPerCore: null + min: null + tcpCloseWaitTimeout: null + tcpEstablishedTimeout: null + detectLocal: + bridgeInterface: "" + interfaceNamePrefix: "" + detectLocalMode: "" + enableProfiling: false + healthzBindAddress: "" + hostnameOverride: "" + iptables: + masqueradeAll: true + masqueradeBit: null + minSyncPeriod: 0s + syncPeriod: 0s + ipvs: + excludeCIDRs: + - 192.0.0.1/32 + minSyncPeriod: 0s + scheduler: "" + strictARP: false + syncPeriod: 0s + tcpFinTimeout: 0s + tcpTimeout: 0s + udpTimeout: 0s + kind: KubeProxyConfiguration + metricsBindAddress: 0.0.0.0:10249 + mode: ipvs + nodePortAddresses: null + oomScoreAdj: null + portRange: "" + showHiddenMetricsForVersion: "" + udpIdleTimeout: 0s + winkernel: + enableDSR: false + forwardHealthCheckVip: false + networkName: "" + rootHnsEndpointName: "" + sourceVip: "" + kubeconfig.conf: |- + {{ .KubeProxyKubeConfig }} +kind: ConfigMap +metadata: + labels: + app: kube-proxy + name: {{ .ConfigMapName }} + namespace: {{ .Namespace }} +` +) diff --git a/pkg/kubenest/manifest/controlplane/proxy/manifests_rbac.go b/pkg/kubenest/manifest/controlplane/proxy/manifests_rbac.go new file mode 100644 index 000000000..5d154f291 --- /dev/null +++ b/pkg/kubenest/manifest/controlplane/proxy/manifests_rbac.go @@ -0,0 +1,11 @@ +package proxy + +const ( + ProxySA = ` +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .SAName }} + namespace: {{ .Namespace }} +` +) diff --git a/pkg/kubenest/tasks/proxy.go b/pkg/kubenest/tasks/proxy.go new file mode 100644 index 000000000..91d070411 --- /dev/null +++ b/pkg/kubenest/tasks/proxy.go @@ -0,0 +1,149 @@ +package tasks + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/controlplane" + apiclient "github.com/kosmos.io/kosmos/pkg/kubenest/util/api-client" + "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" + "github.com/pkg/errors" +) + +var ( + virtualClusterProxyLabels = labels.Set{constants.Label: constants.Proxy} +) + +func NewVirtualClusterProxyTask() workflow.Task { + return workflow.Task{ + Name: "proxy", + Run: runProxy, + RunSubTasks: true, + Tasks: []workflow.Task{ + { + Name: "deploy-proxy", + Run: runVirtualClusterProxy, + }, + { + Name: "check-proxy", + Run: runCheckVirtualClusterProxy, + }, + }, + } +} + +func runProxy(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("proxy task invoked with an invalid data struct") + } + + klog.V(4).InfoS("[proxy] Running proxy task", "virtual cluster", klog.KObj(data)) + return nil +} + +func runVirtualClusterProxy(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("Virtual cluster proxy task invoked with an invalid data struct") + } + + // Get the kubeconfig of virtual cluster and put it into the cm of kube-proxy + secret, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(), + fmt.Sprintf("%s-%s", data.GetName(), constants.AdminConfig), metav1.GetOptions{}) + if err != nil { + return errors.Wrap(err, "Get virtualcluster kubeconfig secret error") + } + config, err := clientcmd.RESTConfigFromKubeConfig(secret.Data[constants.KubeConfig]) + if err != nil { + return err + } + client, err := clientset.NewForConfig(config) + if err != nil { + return err + } + var virtualClient clientset.Interface = client + + kubeconfigString := string(secret.Data[constants.KubeConfig]) + + err = controlplane.EnsureVirtualClusterProxy( + virtualClient, + kubeconfigString, + ) + if err != nil { + return fmt.Errorf("failed to install virtual cluster proxy component, err: %w", err) + } + + klog.V(2).InfoS("[VirtualClusterProxy] Successfully installed virtual cluster proxy component", "virtual cluster", klog.KObj(data)) + return nil +} + +func runCheckVirtualClusterProxy(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("check-VirtualClusterProxy task invoked with an invalid data struct") + } + + checker := apiclient.NewVirtualClusterChecker(data.RemoteClient(), constants.ComponentBeReadyTimeout) + + err := checker.WaitForSomePods(virtualClusterProxyLabels.String(), data.GetNamespace(), 1) + if err != nil { + return fmt.Errorf("checking for virtual cluster proxy to ready timeout, err: %w", err) + } + + klog.V(2).InfoS("[check-VirtualClusterProxy] the virtual cluster proxy is ready", "virtual cluster", klog.KObj(data)) + return nil +} + +func UninstallVirtualClusterProxyTask() workflow.Task { + return workflow.Task{ + Name: "proxy", + Run: runProxy, + RunSubTasks: true, + Tasks: []workflow.Task{ + { + Name: constants.ApiServer, + Run: uninstallVirtualClusterProxy, + }, + }, + } +} + +func uninstallVirtualClusterProxy(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("Virtual cluster proxy task invoked with an invalid data struct") + } + + secret, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(), + fmt.Sprintf("%s-%s", data.GetName(), constants.AdminConfig), metav1.GetOptions{}) + if err != nil { + return errors.Wrap(err, "Get virtualcluster kubeconfig secret error") + } + config, err := clientcmd.RESTConfigFromKubeConfig(secret.Data[constants.KubeConfig]) + if err != nil { + return err + } + client, err := clientset.NewForConfig(config) + if err != nil { + return err + } + var virtualClient clientset.Interface = client + + err = controlplane.DeleteVirtualClusterProxy( + virtualClient, + ) + if err != nil { + return fmt.Errorf("failed to install virtual cluster proxy component, err: %w", err) + } + + klog.V(2).InfoS("[VirtualClusterProxy] Successfully uninstalled virtual cluster proxy component", "virtual cluster", klog.KObj(data)) + return nil +} diff --git a/pkg/kubenest/util/helper.go b/pkg/kubenest/util/helper.go index a442c98a8..4c15d0363 100644 --- a/pkg/kubenest/util/helper.go +++ b/pkg/kubenest/util/helper.go @@ -54,6 +54,66 @@ func DeleteDeployment(client clientset.Interface, deployment string, namespace s return nil } +func CreateOrUpdateDaemonSet(client clientset.Interface, daemonSet *appsv1.DaemonSet) error { + _, err := client.AppsV1().DaemonSets(daemonSet.GetNamespace()).Create(context.TODO(), daemonSet, metav1.CreateOptions{}) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return err + } + + _, err := client.AppsV1().DaemonSets(daemonSet.GetNamespace()).Update(context.TODO(), daemonSet, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + + klog.V(5).InfoS("Successfully created or updated daemonSet", "daemonSet", daemonSet.GetName()) + return nil +} + +func DeleteDaemonSet(client clientset.Interface, daemonSet string, namespace string) error { + err := client.AppsV1().DaemonSets(namespace).Delete(context.TODO(), daemonSet, metav1.DeleteOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + klog.V(2).Infof("DaemonSet %s/%s not found, skip delete", daemonSet, namespace) + return nil + } + return err + } + klog.V(2).Infof("Delete daemonSet %s/%s success", daemonSet, namespace) + return nil +} + +func CreateOrUpdateServiceAccount(client clientset.Interface, serviceAccount *v1.ServiceAccount) error { + _, err := client.CoreV1().ServiceAccounts(serviceAccount.GetNamespace()).Create(context.TODO(), serviceAccount, metav1.CreateOptions{}) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return err + } + + _, err := client.CoreV1().ServiceAccounts(serviceAccount.GetNamespace()).Update(context.TODO(), serviceAccount, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + + klog.V(5).InfoS("Successfully created or updated serviceAccount", "serviceAccount", serviceAccount.GetName()) + return nil +} + +func DeleteServiceAccount(client clientset.Interface, serviceAccount string, namespace string) error { + err := client.CoreV1().ServiceAccounts(namespace).Delete(context.TODO(), serviceAccount, metav1.DeleteOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + klog.V(2).Infof("ServiceAccount %s/%s not found, skip delete", serviceAccount, namespace) + return nil + } + return err + } + klog.V(2).Infof("Delete serviceAccount %s/%s success", serviceAccount, namespace) + return nil +} + func CreateOrUpdateConfigMap(client clientset.Interface, configMap *v1.ConfigMap) error { _, err := client.CoreV1().ConfigMaps(configMap.GetNamespace()).Create(context.TODO(), configMap, metav1.CreateOptions{}) if err != nil {