From 1cf8e8aa85a794a446da4e5c7c5e92d385e09c52 Mon Sep 17 00:00:00 2001 From: wangdepeng Date: Thu, 16 May 2024 18:34:57 +0800 Subject: [PATCH 1/2] feat: hostPortManager support multiport for one virtualCluster Signed-off-by: wangdepeng (cherry picked from commit f23166174b6d9c428025e753ea88d0c42ac79a4f) --- deploy/crds/kosmos.io_virtualclusters.yaml | 5 ++++ .../kosmos/v1alpha1/virtualcluster_types.go | 2 ++ .../kosmos/v1alpha1/zz_generated.deepcopy.go | 7 +++++ pkg/generated/openapi/zz_generated.openapi.go | 15 ++++++++++ pkg/kubenest/constants/constant.go | 7 +++-- .../virtualcluster_init_controller.go | 30 ++++++++++++++----- pkg/kubenest/init.go | 5 ++++ pkg/kubenest/tasks/apiserver.go | 2 +- pkg/kubenest/tasks/data.go | 1 + pkg/kubenest/tasks/service.go | 3 +- 10 files changed, 66 insertions(+), 11 deletions(-) diff --git a/deploy/crds/kosmos.io_virtualclusters.yaml b/deploy/crds/kosmos.io_virtualclusters.yaml index 4439823b1..aa88214bc 100644 --- a/deploy/crds/kosmos.io_virtualclusters.yaml +++ b/deploy/crds/kosmos.io_virtualclusters.yaml @@ -147,6 +147,11 @@ spec: port: format: int32 type: integer + portMap: + additionalProperties: + format: int32 + type: integer + type: object reason: type: string updateTime: diff --git a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go index 1a016843f..2c8b27e66 100644 --- a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go +++ b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go @@ -99,6 +99,8 @@ type VirtualClusterStatus struct { UpdateTime *metav1.Time `json:"updateTime,omitempty" protobuf:"bytes,7,opt,name=updateTime"` // +optional Port int32 `json:"port,omitempty"` + // +optional + PortMap map[string]int32 `json:"portMap,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go index 9230fa45e..1e482c9ae 100644 --- a/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go @@ -1753,6 +1753,13 @@ func (in *VirtualClusterStatus) DeepCopyInto(out *VirtualClusterStatus) { in, out := &in.UpdateTime, &out.UpdateTime *out = (*in).DeepCopy() } + if in.PortMap != nil { + in, out := &in.PortMap, &out.PortMap + *out = make(map[string]int32, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } return } diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index 88b55a26e..defaec587 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -3058,6 +3058,21 @@ func schema_pkg_apis_kosmos_v1alpha1_VirtualClusterStatus(ref common.ReferenceCa Format: "int32", }, }, + "portMap": { + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + AdditionalProperties: &spec.SchemaOrBool{ + Allows: true, + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: 0, + Type: []string{"integer"}, + Format: "int32", + }, + }, + }, + }, + }, }, }, }, diff --git a/pkg/kubenest/constants/constant.go b/pkg/kubenest/constants/constant.go index b2a6133d9..560eb5846 100644 --- a/pkg/kubenest/constants/constant.go +++ b/pkg/kubenest/constants/constant.go @@ -84,8 +84,11 @@ const ( DeInitAction Action = "deInit" //host_port_manager - HostPortsCMName = "kosmos-hostports" - HostPortsCMDataName = "config.yaml" + HostPortsCMName = "kosmos-hostports" + HostPortsCMDataName = "config.yaml" + ApiServerPortKey = "apiserver-port" + ApiServerNetworkProxyPortKey = "apiserver-network-proxy-port" + VirtualClusterPortNum = 2 ManifestComponentsConfigMap = "components-manifest-cm" NodePoolConfigmap = "node-pool" diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index 1a2fabd40..7df0aa455 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -491,7 +491,16 @@ func (c *VirtualClusterInitController) isPortAllocated(port int32) bool { } for _, vc := range vcList.Items { - if vc.Status.Port == port { + // 判断一个map是否包含某个端口 + contains := func(port int32) bool { + for _, p := range vc.Status.PortMap { + if p == port { + return true + } + } + return false + } + if vc.Status.Port == port || contains(port) { return true } } @@ -507,12 +516,19 @@ func (c *VirtualClusterInitController) AllocateHostPort(virtualCluster *v1alpha1 if err != nil { return 0, err } - - for _, port := range hostPool.PortsPool { - if !c.isPortAllocated(port) { - virtualCluster.Status.Port = port - return port, nil + ports := func() []int32 { + ports := make([]int32, 0) + for _, p := range hostPool.PortsPool { + if !c.isPortAllocated(p) { + ports = append(ports, p) + } } + return ports + }() + if len(ports) < constants.VirtualClusterPortNum { + return 0, fmt.Errorf("no available ports to allocate") } - return 0, fmt.Errorf("no available ports to allocate") + virtualCluster.Status.PortMap[constants.ApiServerPortKey] = ports[0] + virtualCluster.Status.PortMap[constants.ApiServerNetworkProxyPortKey] = ports[1] + return 0, err } diff --git a/pkg/kubenest/init.go b/pkg/kubenest/init.go index b139da42a..bc9bfe1f5 100644 --- a/pkg/kubenest/init.go +++ b/pkg/kubenest/init.go @@ -35,6 +35,7 @@ type initData struct { privateRegistry string externalIP string hostPort int32 + hostPortMap map[string]int32 } type InitOptions struct { @@ -233,6 +234,10 @@ func (i initData) HostPort() int32 { return i.hostPort } +func (i initData) HostPortMap() map[string]int32 { + return i.hostPortMap +} + func (i initData) DynamicClient() *dynamic.DynamicClient { return i.dynamicClient } diff --git a/pkg/kubenest/tasks/apiserver.go b/pkg/kubenest/tasks/apiserver.go index 7d7370189..4c077e8e2 100644 --- a/pkg/kubenest/tasks/apiserver.go +++ b/pkg/kubenest/tasks/apiserver.go @@ -50,7 +50,7 @@ func runVirtualClusterAPIServer(r workflow.RunData) error { data.RemoteClient(), data.GetName(), data.GetNamespace(), - data.HostPort(), + data.HostPortMap()[constants.ApiServerPortKey], ) if err != nil { return fmt.Errorf("failed to install virtual cluster apiserver component, err: %w", err) diff --git a/pkg/kubenest/tasks/data.go b/pkg/kubenest/tasks/data.go index 4cf65a336..eea44e9da 100644 --- a/pkg/kubenest/tasks/data.go +++ b/pkg/kubenest/tasks/data.go @@ -20,5 +20,6 @@ type InitData interface { VirtualClusterVersion() string ExternalIP() string HostPort() int32 + HostPortMap() map[string]int32 DynamicClient() *dynamic.DynamicClient } diff --git a/pkg/kubenest/tasks/service.go b/pkg/kubenest/tasks/service.go index f91eb8a06..eb10e3a79 100644 --- a/pkg/kubenest/tasks/service.go +++ b/pkg/kubenest/tasks/service.go @@ -3,6 +3,7 @@ package tasks import ( "errors" "fmt" + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" "k8s.io/klog/v2" @@ -44,7 +45,7 @@ func runVirtualClusterService(r workflow.RunData) error { data.RemoteClient(), data.GetName(), data.GetNamespace(), - data.HostPort(), + data.HostPortMap()[constants.ApiServerPortKey], ) if err != nil { return fmt.Errorf("failed to install virtual cluster service , err: %w", err) From d6d56d561f24fcb6153271e66229721d0a85e260 Mon Sep 17 00:00:00 2001 From: wangdepeng Date: Fri, 17 May 2024 19:14:30 +0800 Subject: [PATCH 2/2] feat: add anp install workflow Signed-off-by: wangdepeng (cherry picked from commit cff6591b937f65f816590ee39f20ad5865cf269c) --- deploy/virtual-cluster-operator.yml | 336 ++++++++++++++++- go.mod | 2 +- hack/generate_globalnode.sh | 24 +- pkg/kubenest/constants/constant.go | 15 +- .../virtualcluster_init_controller.go | 15 +- pkg/kubenest/controlplane/apiserver.go | 8 +- pkg/kubenest/init.go | 3 + .../apiserver/mainfests_deployment.go | 348 ++++++++++++++++++ .../apiserver/manifeats_configmap.go | 28 ++ pkg/kubenest/tasks/anp.go | 261 +++++++++++++ pkg/kubenest/tasks/apiserver.go | 2 +- pkg/kubenest/tasks/service.go | 2 +- pkg/kubenest/util/cert/certs.go | 1 + pkg/kubenest/util/helper.go | 147 ++++++++ 14 files changed, 1158 insertions(+), 34 deletions(-) create mode 100644 pkg/kubenest/manifest/controlplane/apiserver/manifeats_configmap.go create mode 100644 pkg/kubenest/tasks/anp.go diff --git a/deploy/virtual-cluster-operator.yml b/deploy/virtual-cluster-operator.yml index 3884a668f..6a066e761 100644 --- a/deploy/virtual-cluster-operator.yml +++ b/deploy/virtual-cluster-operator.yml @@ -51,15 +51,287 @@ metadata: name: virtual-cluster-operator namespace: kosmos-system data: - # Generate by script hack/k8s-in-k8s/generate_env.sh env.sh: | - __env.sh__ - # Copy from hack/k8s-in-k8s/kubelet_node_helper.sh + #!/usr/bin/env bash + + SCRIPT_VERSION=0.0.1 + # save tmp file + PATH_FILE_TMP=/apps/conf/kosmos/tmp + ################################################### + # path for kubeadm + PATH_KUBEADM=/usr/bin/kubeadm + ################################################## + # path for kubeadm config + PATH_KUBEADM_CONFIG=/etc/kubeadm + ################################################## + # path for kubernetes + PATH_KUBERNETES=/etc/kubernetes/ + PATH_KUBERNETES_PKI="$PATH_KUBERNETES/pki" + # scpKCCmd.name + KUBELET_KUBE_CONFIG_NAME=kubelet.conf + ################################################## + # path for kubelet + PATH_KUBELET_LIB=/var/lib/kubelet + # scpKubeletConfigCmd.name + KUBELET_CONFIG_NAME=config.yaml kubelet_node_helper.sh: | - __kubelet_node_helper__ - # Obtain through the command "kubectl get cm kubelet-config -nkube-system -oyaml", change dns address to `__DNS_ADDRESS__` + #!/usr/bin/env bash + + source "env.sh" + + # args + DNS_ADDRESS=${2:-10.237.0.10} + LOG_NAME=${2:-kubelet} + JOIN_HOST=$2 + JOIN_TOKEN=$3 + JOIN_CA_HASH=$4 + + function unjoin() { + # before unjoin, you need delete node by kubectl + echo "exec(1/2): kubeadm reset...." + echo "y" | ${PATH_KUBEADM} reset + if [ $? -ne 0 ]; then + exit 1 + fi + + echo "exec(2/2): delete cni...." + if [ -d "/etc/cni/net.d" ]; then + mv /etc/cni/net.d '/etc/cni/net.d.back'`date +%Y_%m_%d_%H_%M_%S` + if [ $? -ne 0 ]; then + exit 1 + fi + fi + } + + function revert() { + if [ ! -f "$PATH_KUBEADM_CONFIG/kubeadm.cfg" ]; then + echo "exec(1/1): execure join cmd" + kubeadm join $JOIN_HOST --token $JOIN_TOKEN --discovery-token-ca-cert-hash $JOIN_CA_HASH + if [ $? -ne 0 ]; then + exit 1 + fi + exit 0 + fi + + echo "exec(1/3): update kubeadm.cfg..." + sed -e "s|token: .*$|token: $JOIN_TOKEN|g" -e "w $PATH_FILE_TMP/kubeadm.cfg.current" "$PATH_KUBEADM_CONFIG/kubeadm.cfg" + if [ $? -ne 0 ]; then + exit 1 + fi + + # add taints + echo "exec(2/3): update kubeadm.cfg tanits..." + sed -i "/kubeletExtraArgs/a \ register-with-taints: node.kosmos.io/unschedulable:NoSchedule" "$PATH_FILE_TMP/kubeadm.cfg.current" + if [ $? -ne 0 ]; then + exit 1 + fi + + echo "exec(3/3): execute join cmd...." + kubeadm join --config "$PATH_FILE_TMP/kubeadm.cfg.current" + if [ $? -ne 0 ]; then + exit 1 + fi + } + + # before join, you need upload ca.crt and kubeconfig to tmp dir!!! + function join() { + echo "exec(1/8): stop containerd...." + systemctl stop containerd + if [ $? -ne 0 ]; then + exit 1 + fi + echo "exec(2/8): copy ca.crt...." + cp "$PATH_FILE_TMP/ca.crt" "$PATH_KUBERNETES_PKI/ca.crt" + if [ $? -ne 0 ]; then + exit 1 + fi + echo "exec(3/8): copy kubeconfig...." + cp "$PATH_FILE_TMP/$KUBELET_KUBE_CONFIG_NAME" "$PATH_KUBERNETES/$KUBELET_KUBE_CONFIG_NAME" + if [ $? -ne 0 ]; then + exit 1 + fi + echo "exec(4/8): set core dns address...." + sed -e "s|__DNS_ADDRESS__|$DNS_ADDRESS|g" -e "w ${PATH_KUBELET_LIB}/${KUBELET_CONFIG_NAME}" "$PATH_FILE_TMP"/"$KUBELET_CONFIG_NAME" + if [ $? -ne 0 ]; then + exit 1 + fi + echo "exec(5/8): copy kubeadm-flags.env...." + cp "$PATH_FILE_TMP/kubeadm-flags.env" "$PATH_KUBELET_LIB/kubeadm-flags.env" + if [ $? -ne 0 ]; then + exit 1 + fi + echo "exec(6/8): start containerd" + systemctl start containerd + if [ $? -ne 0 ]; then + exit 1 + fi + + echo "exec(7/8): delete cni...." + if [ -d "/etc/cni/net.d" ]; then + mv /etc/cni/net.d '/etc/cni/net.d.back'`date +%Y_%m_%d_%H_%M_%S` + if [ $? -ne 0 ]; then + exit 1 + fi + fi + + echo "exec(8/8): start kubelet...." + systemctl start kubelet + if [ $? -ne 0 ]; then + exit 1 + fi + } + + function health() { + result=`systemctl is-active containerd` + if [[ $result != "active" ]]; then + echo "health(1/2): containerd is inactive" + exit 1 + else + echo "health(1/2): containerd is active" + fi + + result=`systemctl is-active kubelet` + if [[ $result != "active" ]]; then + echo "health(2/2): kubelet is inactive" + exit 1 + else + echo "health(2/2): containerd is active" + fi + } + + function log() { + systemctl status $LOG_NAME + } + + # check the environments + function check() { + echo "check(1/3): try to create $PATH_FILE_TMP" + if [ ! -d "$PATH_FILE_TMP" ]; then + mkdir -p "$PATH_FILE_TMP" + if [ $? -ne 0 ]; then + exit 1 + fi + fi + + echo "check(2/3): check dir: $PATH_KUBEADM_CONFIG" + if [ ! -d "$PATH_KUBEADM_CONFIG" ]; then + mkdir -p "$PATH_KUBEADM_CONFIG" + if [ $? -ne 0 ]; then + exit 1 + fi + + echo "--- + apiVersion: kubeadm.k8s.io/v1beta2 + discovery: + bootstrapToken: + apiServerEndpoint: apiserver.cluster.local:6443 + token: xxxxxxxx + unsafeSkipCAVerification: true + kind: JoinConfiguration + nodeRegistration: + criSocket: /run/containerd/containerd.sock + kubeletExtraArgs: + container-runtime: remote + container-runtime-endpoint: unix:///run/containerd/containerd.sock + taints: null" > $PATH_KUBEADM_CONFIG/kubeadm.cfg + + fi + + echo "check(3/3): copy kubeadm-flags.env to create $PATH_FILE_TMP , remove args[cloud-provider] and taints" + sed -e "s| --cloud-provider=external | |g" -e "w ${PATH_FILE_TMP}/kubeadm-flags.env" "$PATH_KUBELET_LIB/kubeadm-flags.env" + sed -i "s| --register-with-taints=node.kosmos.io/unschedulable:NoSchedule||g" "${PATH_FILE_TMP}/kubeadm-flags.env" + if [ $? -ne 0 ]; then + exit 1 + fi + + echo "environments is ok" + } + + function version() { + echo "$SCRIPT_VERSION" + } + + # See how we were called. + case "$1" in + unjoin) + unjoin + ;; + join) + join + ;; + health) + health + ;; + check) + check + ;; + log) + log + ;; + revert) + revert + ;; + version) + version + ;; + *) + echo $"usage: $0 unjoin|join|health|log|check|version|revert" + exit 1 + esac config.yaml: | - __config.yaml__ + apiVersion: kubelet.config.k8s.io/v1beta1 + authentication: + anonymous: + enabled: false + webhook: + cacheTTL: 0s + enabled: true + x509: + clientCAFile: /etc/kubernetes/pki/ca.crt + authorization: + mode: Webhook + webhook: + cacheAuthorizedTTL: 0s + cacheUnauthorizedTTL: 0s + cgroupDriver: cgroupfs + clusterDNS: + - __DNS_ADDRESS__ + clusterDomain: cluster.local + cpuManagerReconcilePeriod: 0s + evictionHard: + imagefs.available: 15% + memory.available: 100Mi + nodefs.available: 10% + nodefs.inodesFree: 5% + evictionPressureTransitionPeriod: 5m0s + fileCheckFrequency: 0s + healthzBindAddress: 127.0.0.1 + healthzPort: 10248 + httpCheckFrequency: 0s + imageMinimumGCAge: 0s + kind: KubeletConfiguration + kubeAPIBurst: 100 + kubeAPIQPS: 100 + kubeReserved: + cpu: 140m + memory: 1.80G + logging: + flushFrequency: 0 + options: + json: + infoBufferSize: "0" + verbosity: 0 + memorySwap: {} + nodeStatusReportFrequency: 0s + nodeStatusUpdateFrequency: 0s + rotateCertificates: true + runtimeRequestTimeout: 0s + shutdownGracePeriod: 0s + shutdownGracePeriodCriticalPods: 0s + staticPodPath: /etc/kubernetes/manifests + streamingConnectionIdleTimeout: 0s + syncFrequency: 0s + volumeStatsAggPeriod: 0s --- apiVersion: apps/v1 kind: Deployment @@ -116,12 +388,6 @@ spec: # Enter the ip address of a master node - name: EXECTOR_HOST_MASTER_NODE_IP value: 192.168.0.1 - # env.sh KUBELET_CONFIG_NAME - - name: KUBELET_CONFIG_NAME - value: config.yaml - # env.sh KUBELET_KUBE_CONFIG_NAME - - name: KUBELET_KUBE_CONFIG_NAME - value: kubelet.conf # WEB_USER and WEB_PASS for generate token that can be used to access the node-agent - name: WEB_USER valueFrom: @@ -133,8 +399,6 @@ spec: secretKeyRef: name: node-agent-secret key: password - - name: EXECTOR_SHELL_NAME - value: kubelet_node_helper.sh volumeMounts: - name: credentials mountPath: /etc/virtual-cluster-operator @@ -249,4 +513,46 @@ type: kubernetes.io/basic-auth data: username: {{ .USERNAME }} password: {{ .PASSWORD }} - +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: kosmos-hostports + namespace: kosmos-system +data: + config.yaml: | + # ports allocate for virtual cluster api server,from 33001, increment by 1 for each virtual cluster.Be careful not to use ports that are already in use + portsPool: + - 33001 + - 33002 + - 33003 + - 33004 + - 33005 + - 33006 + - 33007 + - 33008 + - 33009 + - 33010 +--- +apiVersion: v1 +data: + egress_selector_configuration.yaml: | + apiVersion: apiserver.k8s.io/v1beta1 + kind: EgressSelectorConfiguration + egressSelections: + - name: cluster + connection: + proxyProtocol: GRPC + transport: + uds: + udsName: /etc/kubernetes/konnectivity-server/konnectivity-server.socket + - name: master + connection: + proxyProtocol: Direct + - name: etcd + connection: + proxyProtocol: Direct +kind: ConfigMap +metadata: + name: kas-proxy-files + namespace: kas-proxy \ No newline at end of file diff --git a/go.mod b/go.mod index f2d6a9831..592bf813e 100644 --- a/go.mod +++ b/go.mod @@ -48,6 +48,7 @@ require ( k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 sigs.k8s.io/controller-runtime v0.14.5 sigs.k8s.io/mcs-api v0.1.0 + sigs.k8s.io/yaml v1.3.0 ) require ( @@ -188,7 +189,6 @@ require ( sigs.k8s.io/kustomize/api v0.12.1 // indirect sigs.k8s.io/kustomize/kyaml v0.13.9 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect - sigs.k8s.io/yaml v1.3.0 // indirect ) replace ( diff --git a/hack/generate_globalnode.sh b/hack/generate_globalnode.sh index 4862d47cd..b8020f8b9 100755 --- a/hack/generate_globalnode.sh +++ b/hack/generate_globalnode.sh @@ -5,20 +5,36 @@ if [ -z "$KUBECONFIG" ]; then exit 1 fi +# Creating a directory for logs +mkdir -p kube_apply_logs + nodes=$(kubectl get nodes -o jsonpath='{.items[*].metadata.name}') for node in ${nodes}; do nodeIP=$(kubectl get node ${node} -o jsonpath='{.status.addresses[0].address}') labels=$(kubectl get node ${node} -o jsonpath='{.metadata.labels}') - labelsFormatted=$(echo "$labels" | jq -r 'to_entries | .[] | " \(.key): \(.value)"') - echo " + + # Use jq to ensure all values are strings, but also explicitly add quotes in the YAML formatting step below + labelsFormatted=$(echo "$labels" | jq -r 'to_entries | map(.value |= tostring) | .[] | " \(.key): \"\(.value)\""') + + yamlContent=" apiVersion: kosmos.io/v1alpha1 kind: GlobalNode metadata: name: ${node} spec: + state: \"reserved\" nodeIP: \"${nodeIP}\" labels: -$(echo "${labelsFormatted}" | sed 's/=/": "/g' | awk '{print " " $0}') -" | kubectl apply -f - +$(echo "${labelsFormatted}" | awk '{print " " $0}') +" + + # Log the YAML content to a file for inspection + echo "$yamlContent" > kube_apply_logs/${node}.yaml + + # Apply the YAML + echo "$yamlContent" | kubectl apply -f - + done +# clear resources +rm -rf kube_apply_logs \ No newline at end of file diff --git a/pkg/kubenest/constants/constant.go b/pkg/kubenest/constants/constant.go index 560eb5846..87a46229b 100644 --- a/pkg/kubenest/constants/constant.go +++ b/pkg/kubenest/constants/constant.go @@ -39,7 +39,7 @@ const ( //controlplane apiserver ApiServer = "apiserver" - ApiServerReplicas = 2 + ApiServerReplicas = 1 ApiServerServiceSubnet = "10.237.6.0/18" ApiServerEtcdListenClientPort = 2379 ApiServerServiceType = "NodePort" @@ -84,11 +84,14 @@ const ( DeInitAction Action = "deInit" //host_port_manager - HostPortsCMName = "kosmos-hostports" - HostPortsCMDataName = "config.yaml" - ApiServerPortKey = "apiserver-port" - ApiServerNetworkProxyPortKey = "apiserver-network-proxy-port" - VirtualClusterPortNum = 2 + HostPortsCMName = "kosmos-hostports" + HostPortsCMDataName = "config.yaml" + ApiServerPortKey = "apiserver-port" + ApiServerNetworkProxyAgentPortKey = "apiserver-network-proxy-agent-port" + ApiServerNetworkProxyServerPortKey = "apiserver-network-proxy-server-port" + ApiServerNetworkProxyHealthPortKey = "apiserver-network-proxy-health-port" + ApiServerNetworkProxyAdminPortKey = "apiserver-network-proxy-admin-port" + VirtualClusterPortNum = 5 ManifestComponentsConfigMap = "components-manifest-cm" NodePoolConfigmap = "node-pool" diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index 7df0aa455..4718ba2d7 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -508,10 +508,14 @@ func (c *VirtualClusterInitController) isPortAllocated(port int32) bool { return false } +// AllocateHostPort allocate host port for virtual cluster +// #nosec G602 func (c *VirtualClusterInitController) AllocateHostPort(virtualCluster *v1alpha1.VirtualCluster) (int32, error) { c.lock.Lock() defer c.lock.Unlock() - + if len(virtualCluster.Status.PortMap) > 0 || virtualCluster.Status.Port != 0 { + return 0, nil + } hostPool, err := GetHostPortPoolFromConfigMap(c.RootClientSet, constants.KosmosNs, constants.HostPortsCMName, constants.HostPortsCMDataName) if err != nil { return 0, err @@ -528,7 +532,14 @@ func (c *VirtualClusterInitController) AllocateHostPort(virtualCluster *v1alpha1 if len(ports) < constants.VirtualClusterPortNum { return 0, fmt.Errorf("no available ports to allocate") } + virtualCluster.Status.PortMap = make(map[string]int32) virtualCluster.Status.PortMap[constants.ApiServerPortKey] = ports[0] - virtualCluster.Status.PortMap[constants.ApiServerNetworkProxyPortKey] = ports[1] + virtualCluster.Status.PortMap[constants.ApiServerNetworkProxyAgentPortKey] = ports[1] + virtualCluster.Status.PortMap[constants.ApiServerNetworkProxyServerPortKey] = ports[2] + virtualCluster.Status.PortMap[constants.ApiServerNetworkProxyHealthPortKey] = ports[3] + virtualCluster.Status.PortMap[constants.ApiServerNetworkProxyAdminPortKey] = ports[4] + + klog.V(4).InfoS("Success allocate virtual cluster ports", "allocate ports", ports, "vc ports", ports[:2]) + return 0, err } diff --git a/pkg/kubenest/controlplane/apiserver.go b/pkg/kubenest/controlplane/apiserver.go index 650afd102..95122f51c 100644 --- a/pkg/kubenest/controlplane/apiserver.go +++ b/pkg/kubenest/controlplane/apiserver.go @@ -13,8 +13,8 @@ import ( "github.com/kosmos.io/kosmos/pkg/kubenest/util" ) -func EnsureVirtualClusterAPIServer(client clientset.Interface, name, namespace string, port int32) error { - if err := installAPIServer(client, name, namespace, port); err != nil { +func EnsureVirtualClusterAPIServer(client clientset.Interface, name, namespace string, portMap map[string]int32) error { + if err := installAPIServer(client, name, namespace, portMap); err != nil { return fmt.Errorf("failed to install virtual cluster apiserver, err: %w", err) } return nil @@ -28,7 +28,7 @@ func DeleteVirtualClusterAPIServer(client clientset.Interface, name, namespace s return nil } -func installAPIServer(client clientset.Interface, name, namespace string, port int32) error { +func installAPIServer(client clientset.Interface, name, namespace string, portMap map[string]int32) error { imageRepository, imageVersion := util.GetImageMessage() clusterIp, err := util.GetEtcdServiceClusterIp(namespace, name+constants.EtcdSuffix, client) if err != nil { @@ -52,7 +52,7 @@ func installAPIServer(client clientset.Interface, name, namespace string, port i EtcdCertsSecret: fmt.Sprintf("%s-%s", name, "etcd-cert"), Replicas: constants.ApiServerReplicas, EtcdListenClientPort: constants.ApiServerEtcdListenClientPort, - ClusterPort: port, + ClusterPort: portMap[constants.ApiServerPortKey], }) if err != nil { return fmt.Errorf("error when parsing virtual cluster apiserver deployment template: %w", err) diff --git a/pkg/kubenest/init.go b/pkg/kubenest/init.go index bc9bfe1f5..c01fc8684 100644 --- a/pkg/kubenest/init.go +++ b/pkg/kubenest/init.go @@ -57,6 +57,7 @@ func NewInitPhase(opts *InitOptions) *workflow.Phase { initPhase.AppendTask(tasks.NewVirtualClusterApiserverTask()) initPhase.AppendTask(tasks.NewUploadKubeconfigTask()) initPhase.AppendTask(tasks.NewCheckApiserverHealthTask()) + initPhase.AppendTask(tasks.NewAnpTask()) initPhase.AppendTask(tasks.NewComponentTask()) initPhase.AppendTask(tasks.NewCheckControlPlaneTask()) // create core-dns @@ -75,6 +76,7 @@ func UninstallPhase(opts *InitOptions) *workflow.Phase { destroyPhase.AppendTask(tasks.UninstallCoreDNSTask()) destroyPhase.AppendTask(tasks.UninstallComponentTask()) destroyPhase.AppendTask(tasks.UninstallVirtualClusterApiserverTask()) + destroyPhase.AppendTask(tasks.UninstallAnpTask()) destroyPhase.AppendTask(tasks.UninstallEtcdTask()) destroyPhase.AppendTask(tasks.UninstallVirtualClusterServiceTask()) destroyPhase.AppendTask(tasks.UninstallCertsAndKubeconfigTask()) @@ -171,6 +173,7 @@ func newRunData(opt *InitOptions) (*initData, error) { CertStore: cert.NewCertStore(), externalIP: opt.virtualCluster.Spec.ExternalIP, hostPort: opt.virtualCluster.Status.Port, + hostPortMap: opt.virtualCluster.Status.PortMap, }, nil } diff --git a/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go b/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go index 2104affa0..5b02760c9 100644 --- a/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go +++ b/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go @@ -128,5 +128,353 @@ spec: - name: etcd-cert secret: secretName: {{ .EtcdCertsSecret }} +` + ApiserverAnpDeployment = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + virtualCluster-app: apiserver + app.kubernetes.io/managed-by: virtual-cluster-controller + name: {{ .DeploymentName }} + namespace: {{ .Namespace }} +spec: + replicas: {{ .Replicas }} + selector: + matchLabels: + virtualCluster-app: apiserver + template: + metadata: + labels: + virtualCluster-app: apiserver + spec: + automountServiceAccountToken: false + hostNetwork: true + tolerations: + - key: "node-role.kubernetes.io/control-plane" + operator: "Exists" + effect: "NoSchedule" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node-role.kubernetes.io/control-plane + operator: Exists + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 100 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: virtualCluster-app + operator: In + values: + - apiserver + topologyKey: kubernetes.io/hostname + containers: + - name: kube-apiserver + image: {{ .ImageRepository }}/kube-apiserver:{{ .Version }} + imagePullPolicy: IfNotPresent + env: + - name: PODIP + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + command: + - kube-apiserver + - --allow-privileged=true + - --authorization-mode=Node,RBAC + - --client-ca-file=/etc/virtualcluster/pki/ca.crt + - --enable-admission-plugins=NodeRestriction + - --enable-bootstrap-token-auth=true + - --etcd-cafile=/etc/etcd/pki/etcd-ca.crt + - --etcd-certfile=/etc/etcd/pki/etcd-client.crt + - --etcd-keyfile=/etc/etcd/pki/etcd-client.key + #- --etcd-servers=https://{{ .EtcdClientService }}.{{ .Namespace }}.svc.cluster.local:{{ .EtcdListenClientPort }} + - --etcd-servers=https://{{ .EtcdClientService }}:{{ .EtcdListenClientPort }} + - --bind-address=0.0.0.0 + - --kubelet-client-certificate=/etc/virtualcluster/pki/virtualCluster.crt + - --kubelet-client-key=/etc/virtualcluster/pki/virtualCluster.key + - --kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname + - --secure-port={{ .ClusterPort }} + - --service-account-issuer=https://kubernetes.default.svc.cluster.local + - --service-account-key-file=/etc/virtualcluster/pki/virtualCluster.key + - --service-account-signing-key-file=/etc/virtualcluster/pki/virtualCluster.key + - --service-cluster-ip-range={{ .ServiceSubnet }} + - --proxy-client-cert-file=/etc/virtualcluster/pki/front-proxy-client.crt + - --proxy-client-key-file=/etc/virtualcluster/pki/front-proxy-client.key + - --requestheader-allowed-names=front-proxy-client + - --requestheader-client-ca-file=/etc/virtualcluster/pki/front-proxy-ca.crt + - --requestheader-extra-headers-prefix=X-Remote-Extra- + - --requestheader-group-headers=X-Remote-Group + - --requestheader-username-headers=X-Remote-User + - --tls-cert-file=/etc/virtualcluster/pki/apiserver.crt + - --tls-private-key-file=/etc/virtualcluster/pki/apiserver.key + - --tls-min-version=VersionTLS13 + - --max-requests-inflight=1500 + - --max-mutating-requests-inflight=500 + - --v=4 + - --advertise-address=$(PODIP) + - --egress-selector-config-file=/etc/kubernetes/konnectivity-server-config/{{ .Namespace }}/{{ .Name }}/egress_selector_configuration.yaml + livenessProbe: + failureThreshold: 8 + httpGet: + path: /livez + port: {{ .ClusterPort }} + scheme: HTTPS + initialDelaySeconds: 10 + periodSeconds: 10 + successThreshold: 1 + timeoutSeconds: 15 + readinessProbe: + failureThreshold: 3 + httpGet: + path: /readyz + port: {{ .ClusterPort }} + scheme: HTTPS + initialDelaySeconds: 10 + periodSeconds: 10 + successThreshold: 1 + timeoutSeconds: 15 + ports: + - containerPort: {{ .ClusterPort }} + name: http + protocol: TCP + volumeMounts: + - mountPath: /etc/virtualcluster/pki + name: apiserver-cert + readOnly: true + - mountPath: /etc/etcd/pki + name: etcd-cert + readOnly: true + - mountPath: /etc/kubernetes/konnectivity-server/{{ .Namespace }}/{{ .Name }} + readOnly: false + name: konnectivity-uds + - name: kas-proxy + mountPath: /etc/kubernetes/konnectivity-server-config/{{ .Namespace }}/{{ .Name }}/egress_selector_configuration.yaml + subPath: egress_selector_configuration.yaml + - name: konnectivity-server-container + image: {{ .ImageRepository }}/kas-network-proxy-server:{{ .Version }} + resources: + requests: + cpu: 1m + securityContext: + allowPrivilegeEscalation: false + runAsUser: 0 + command: [ "/proxy-server"] + args: [ + "--log-file=/var/log/konnectivity-server.log", + "--logtostderr=true", + "--log-file-max-size=0", + "--uds-name=/etc/kubernetes/konnectivity-server/{{ .Namespace }}/{{ .Name }}/konnectivity-server.socket", + "--delete-existing-uds-file", + "--cluster-cert=/etc/virtualcluster/pki/apiserver.crt", + "--cluster-key=/etc/virtualcluster/pki/apiserver.key", + "--server-port=0", + "--agent-port={{ .AgentPort }}", + "--health-port={{ .HealthPort }}", + "--admin-port={{ .AdminPort }}", + "--keepalive-time=1h", + "--mode=grpc", + "--agent-namespace=kube-system", + "--agent-service-account=konnectivity-agent", + "--kubeconfig=/etc/apiserver/kubeconfig", + "--authentication-audience=system:konnectivity-server", + ] + livenessProbe: + httpGet: + scheme: HTTP + host: 127.0.0.1 + port: {{ .HealthPort }} + path: /healthz + initialDelaySeconds: 10 + timeoutSeconds: 60 + ports: + - name: serverport + containerPort: {{ .ServerPort }} + hostPort: {{ .ServerPort }} + - name: agentport + containerPort: {{ .AgentPort }} + hostPort: {{ .AgentPort }} + - name: healthport + containerPort: {{ .HealthPort }} + hostPort: {{ .HealthPort }} + - name: adminport + containerPort: {{ .AdminPort }} + hostPort: {{ .AdminPort }} + volumeMounts: + - mountPath: /etc/virtualcluster/pki + name: apiserver-cert + readOnly: true + - name: varlogkonnectivityserver + mountPath: /var/log/konnectivity-server.log + readOnly: false + - name: konnectivity-home + mountPath: /etc/kubernetes/konnectivity-server/{{ .Namespace }}/{{ .Name }} + - mountPath: /etc/apiserver/kubeconfig + name: kubeconfig + subPath: kubeconfig + priorityClassName: system-node-critical + volumes: + - name: kubeconfig + secret: + defaultMode: 420 + secretName: {{ .KubeconfigSecret }} + - name: varlogkonnectivityserver + hostPath: + path: /var/log/{{ .Namespace }}/{{ .Name }}/konnectivity-server.log + type: FileOrCreate + - name: konnectivity-home + hostPath: + path: /etc/kubernetes/konnectivity-server/{{ .Namespace }}/{{ .Name }} + type: DirectoryOrCreate + - name: apiserver-cert + secret: + secretName: {{ .VirtualClusterCertsSecret }} + - name: etcd-cert + secret: + secretName: {{ .EtcdCertsSecret }} + - name: konnectivity-uds + hostPath: + path: /etc/kubernetes/konnectivity-server/{{ .Namespace }}/{{ .Name }} + type: DirectoryOrCreate + - name: kas-proxy + configMap: + name: kas-proxy-files +` + AnpAgentManifest = ` +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: system:konnectivity-server + labels: + kubernetes.io/cluster-service: "true" +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:auth-delegator +subjects: + - apiGroup: rbac.authorization.k8s.io + kind: User + name: system:konnectivity-server +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: konnectivity-agent + namespace: kube-system + labels: + kubernetes.io/cluster-service: "true" +--- +apiVersion: apps/v1 +kind: DaemonSet +metadata: + labels: + k8s-app: konnectivity-agent + namespace: kube-system + name: konnectivity-agent +spec: + selector: + matchLabels: + k8s-app: konnectivity-agent + updateStrategy: + type: RollingUpdate + template: + metadata: + labels: + k8s-app: konnectivity-agent + spec: + priorityClassName: system-cluster-critical + tolerations: + - key: "CriticalAddonsOnly" + operator: "Exists" + - operator: "Exists" + effect: "NoExecute" + nodeSelector: + kubernetes.io/os: linux + dnsPolicy: ClusterFirstWithHostNet + containers: + - name: konnectivity-agent-container + image: {{ .ImageRepository }}/kas-network-proxy-agent:{{ .Version }} + resources: + requests: + cpu: 50m + limits: + memory: 30Mi + command: [ "/proxy-agent"] + args: [ + "--logtostderr=true", + "--ca-cert=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", + "--proxy-server-host=konnectivity-server.kube-system.svc.cluster.local", + "--proxy-server-port={{ .AgentPort }}", + "--sync-interval=5s", + "--sync-interval-cap=30s", + "--probe-interval=5s", + "--service-account-token-path=/var/run/secrets/tokens/konnectivity-agent-token", + "--agent-identifiers=ipv4=$(HOST_IP)" + ] + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: HOST_IP + valueFrom: + fieldRef: + fieldPath: status.hostIP + livenessProbe: + httpGet: + scheme: HTTP + port: 8093 + path: /healthz + initialDelaySeconds: 15 + timeoutSeconds: 15 + readinessProbe: + httpGet: + scheme: HTTP + port: 8093 + path: /readyz + initialDelaySeconds: 15 + timeoutSeconds: 15 + volumeMounts: + - mountPath: /var/run/secrets/tokens + name: konnectivity-agent-token + serviceAccountName: konnectivity-agent + volumes: + - name: konnectivity-agent-token + projected: + sources: + - serviceAccountToken: + path: konnectivity-agent-token + audience: system:konnectivity-server +--- +apiVersion: v1 +kind: Endpoints +metadata: + name: konnectivity-server + namespace: kube-system +subsets: + - addresses: + {{- range .ProxyServerHost }} + - ip: {{ . }} + {{- end }} + ports: + - port: {{ .AgentPort }} +--- +apiVersion: v1 +kind: Service +metadata: + name: konnectivity-server + namespace: kube-system +spec: + ports: + - port: {{ .AgentPort }} + targetPort: {{ .AgentPort }} ` ) diff --git a/pkg/kubenest/manifest/controlplane/apiserver/manifeats_configmap.go b/pkg/kubenest/manifest/controlplane/apiserver/manifeats_configmap.go new file mode 100644 index 000000000..b69a97522 --- /dev/null +++ b/pkg/kubenest/manifest/controlplane/apiserver/manifeats_configmap.go @@ -0,0 +1,28 @@ +package apiserver + +const ( + EgressSelectorConfiguration = ` +apiVersion: v1 +data: + egress_selector_configuration.yaml: | + apiVersion: apiserver.k8s.io/v1beta1 + kind: EgressSelectorConfiguration + egressSelections: + - name: cluster + connection: + proxyProtocol: GRPC + transport: + uds: + udsName: /etc/kubernetes/konnectivity-server/{{ .Namespace }}/{{ .Name }}/konnectivity-server.socket + - name: master + connection: + proxyProtocol: Direct + - name: etcd + connection: + proxyProtocol: Direct +kind: ConfigMap +metadata: + name: kas-proxy-files + namespace: {{ .Namespace }} +` +) diff --git a/pkg/kubenest/tasks/anp.go b/pkg/kubenest/tasks/anp.go new file mode 100644 index 000000000..f22405285 --- /dev/null +++ b/pkg/kubenest/tasks/anp.go @@ -0,0 +1,261 @@ +package tasks + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/dynamic" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/apiserver" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" + "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" +) + +func NewAnpTask() workflow.Task { + return workflow.Task{ + Name: "anp", + Run: runAnp, + RunSubTasks: true, + Tasks: []workflow.Task{ + { + Name: "deploy-anp-server", + Run: runAnpServer, + }, + { + Name: "deploy-anp-agent", + Run: runAnpAgent, + }, + }, + } +} + +func runAnp(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("anp task invoked with an invalid data struct") + } + + klog.V(4).InfoS("[anp] Running anp task", "virtual cluster", klog.KObj(data)) + return nil +} + +func runAnpServer(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("Virtual cluster anp task invoked with an invalid data struct") + } + // install egress_selector_configuration config map + egressSelectorConfig, err := util.ParseTemplate(apiserver.EgressSelectorConfiguration, struct { + Namespace string + }{ + Namespace: data.GetNamespace(), + }) + if err != nil { + return fmt.Errorf("failed to parse egress_selector_configuration config map template, err: %w", err) + } + cm := &v1.ConfigMap{} + err = yaml.Unmarshal([]byte(egressSelectorConfig), cm) + if err != nil { + return fmt.Errorf("failed to parse egress_selector_configuration config map template, err: %w", err) + } + // create configMap + err = util.CreateOrUpdateConfigMap(data.RemoteClient(), cm) + if err != nil { + return fmt.Errorf("failed to create egress_selector_configuration config map, err: %w", err) + } + err = installAnpServer(data.RemoteClient(), data.GetName(), data.GetNamespace(), data.HostPortMap()) + if err != nil { + return fmt.Errorf("failed to install virtual cluster anp component, err: %w", err) + } + + klog.V(2).InfoS("[VirtualClusterAnp] Successfully installed virtual cluster anp component", "virtual cluster", klog.KObj(data)) + return nil +} + +func runAnpAgent(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("check-VirtualClusterAnp task invoked with an invalid data struct") + } + return installAnpAgent(data.RemoteClient(), data.GetName(), data.GetNamespace(), data.HostPortMap()) +} + +func UninstallAnpTask() workflow.Task { + return workflow.Task{ + Name: "anp", + Run: runAnp, + RunSubTasks: true, + Tasks: []workflow.Task{ + { + Name: "anp", + Run: uninstallAnp, + }, + }, + } +} + +func uninstallAnp(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("Virtual cluster anp task invoked with an invalid data struct") + } + + anpManifest, vcClient, err := getAnpAgentManifest(data.RemoteClient(), data.GetName(), data.GetNamespace(), data.HostPortMap()) + if err != nil { + return fmt.Errorf("failed to uninstall anp agent when get anp manifest, err: %w", err) + } + actionFunc := func(ctx context.Context, c dynamic.Interface, u *unstructured.Unstructured) error { + // create the object + return util.DeleteObject(vcClient, u.GetNamespace(), u.GetName(), u) + } + + klog.V(2).InfoS("[VirtualClusterAnp] Successfully uninstalled virtual cluster anp component", "virtual cluster", klog.KObj(data)) + return util.ForEachObjectInYAML(context.TODO(), vcClient, []byte(anpManifest), "", actionFunc) +} +func installAnpServer(client clientset.Interface, name, namespace string, portMap map[string]int32) error { + imageRepository, imageVersion := util.GetImageMessage() + clusterIp, err := util.GetEtcdServiceClusterIp(namespace, name+constants.EtcdSuffix, client) + if err != nil { + return nil + } + + apiserverDeploymentBytes, err := util.ParseTemplate(apiserver.ApiserverAnpDeployment, struct { + DeploymentName, Namespace, ImageRepository, EtcdClientService, Version string + ServiceSubnet, VirtualClusterCertsSecret, EtcdCertsSecret string + Replicas int32 + EtcdListenClientPort int32 + ClusterPort int32 + AgentPort int32 + ServerPort int32 + HealthPort int32 + AdminPort int32 + KubeconfigSecret string + Name string + }{ + DeploymentName: fmt.Sprintf("%s-%s", name, "apiserver"), + Namespace: namespace, + ImageRepository: imageRepository, + Version: imageVersion, + EtcdClientService: clusterIp, + ServiceSubnet: constants.ApiServerServiceSubnet, + VirtualClusterCertsSecret: fmt.Sprintf("%s-%s", name, "cert"), + EtcdCertsSecret: fmt.Sprintf("%s-%s", name, "etcd-cert"), + Replicas: constants.ApiServerReplicas, + EtcdListenClientPort: constants.ApiServerEtcdListenClientPort, + ClusterPort: portMap[constants.ApiServerPortKey], + AgentPort: portMap[constants.ApiServerNetworkProxyAgentPortKey], + ServerPort: portMap[constants.ApiServerNetworkProxyServerPortKey], + HealthPort: portMap[constants.ApiServerNetworkProxyHealthPortKey], + AdminPort: portMap[constants.ApiServerNetworkProxyAdminPortKey], + KubeconfigSecret: fmt.Sprintf("%s-%s", name, "admin-config-clusterip"), + Name: name, + }) + if err != nil { + return fmt.Errorf("error when parsing virtual cluster apiserver deployment template: %w", err) + } + klog.V(4).InfoS("[anp] apply anp server", "anp sever deploy", apiserverDeploymentBytes) + + apiserverDeployment := &appsv1.Deployment{} + if err := yaml.Unmarshal([]byte(apiserverDeploymentBytes), apiserverDeployment); err != nil { + return fmt.Errorf("error when decoding virtual cluster apiserver deployment: %w", err) + } + + if err := util.CreateOrUpdateDeployment(client, apiserverDeployment); err != nil { + return fmt.Errorf("error when creating deployment for %s, err: %w", apiserverDeployment.Name, err) + } + return nil +} + +func installAnpAgent(client clientset.Interface, name, namespace string, portMap map[string]int32) error { + anpAgentManifestBytes, vcClient, err2 := getAnpAgentManifest(client, name, namespace, portMap) + if err2 != nil { + return err2 + } + actionFunc := func(ctx context.Context, c dynamic.Interface, u *unstructured.Unstructured) error { + // create the object + return util.ApplyObject(vcClient, u) + } + return util.ForEachObjectInYAML(context.TODO(), vcClient, []byte(anpAgentManifestBytes), "", actionFunc) +} + +func getAnpAgentManifest(client clientset.Interface, name string, namespace string, portMap map[string]int32) (string, dynamic.Interface, error) { + imageRepository, imageVersion := util.GetImageMessage() + // get apiServer hostIp + proxyServerHost, err := getDeploymentPodIPs(client, namespace, fmt.Sprintf("%s-%s", name, "apiserver")) + if err != nil { + return "", nil, fmt.Errorf("error when get apiserver hostIp, err: %w", err) + } + + anpAgentManifeattBytes, err := util.ParseTemplate(apiserver.AnpAgentManifest, struct { + ImageRepository string + Version string + AgentPort int32 + ProxyServerHost []string + }{ + ImageRepository: imageRepository, + Version: imageVersion, + AgentPort: portMap[constants.ApiServerNetworkProxyAgentPortKey], + ProxyServerHost: proxyServerHost, + }) + if err != nil { + return "", nil, fmt.Errorf("error when parsing virtual cluster apiserver deployment template: %w", err) + } + klog.V(4).InfoS("[anp] apply anp agent", "agent manifest", anpAgentManifeattBytes) + vcClient, err := getVcClient(client, name, namespace) + if err != nil { + return "", nil, fmt.Errorf("error when get vcClient, err: %v", err) + } + return anpAgentManifeattBytes, vcClient, nil +} + +// getDeploymentPodIPs 获取指定 Deployment 的所有 Pod IP 地址 +func getDeploymentPodIPs(clientset clientset.Interface, namespace, deploymentName string) ([]string, error) { + deployment, err := clientset.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("error getting deployment: %v", err) + } + + labelSelector := metav1.FormatLabelSelector(deployment.Spec.Selector) + listOptions := metav1.ListOptions{LabelSelector: labelSelector} + + pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), listOptions) + if err != nil { + return nil, fmt.Errorf("error listing pods: %v", err) + } + + var podIPs []string + for _, pod := range pods.Items { + if pod.Status.Phase == v1.PodRunning { + podIPs = append(podIPs, pod.Status.PodIP) + } + } + + return podIPs, nil +} + +func getVcClient(client clientset.Interface, name, namespace string) (dynamic.Interface, error) { + secret, err := client.CoreV1().Secrets(namespace).Get(context.TODO(), + fmt.Sprintf("%s-%s", name, constants.AdminConfig), metav1.GetOptions{}) + if err != nil { + return nil, errors.Wrap(err, "Get virtualcluster kubeconfig secret error") + } + config, err := clientcmd.RESTConfigFromKubeConfig(secret.Data[constants.KubeConfig]) + if err != nil { + return nil, err + } + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + return nil, err + } + return dynamicClient, nil +} diff --git a/pkg/kubenest/tasks/apiserver.go b/pkg/kubenest/tasks/apiserver.go index 4c077e8e2..93f803f47 100644 --- a/pkg/kubenest/tasks/apiserver.go +++ b/pkg/kubenest/tasks/apiserver.go @@ -50,7 +50,7 @@ func runVirtualClusterAPIServer(r workflow.RunData) error { data.RemoteClient(), data.GetName(), data.GetNamespace(), - data.HostPortMap()[constants.ApiServerPortKey], + data.HostPortMap(), ) if err != nil { return fmt.Errorf("failed to install virtual cluster apiserver component, err: %w", err) diff --git a/pkg/kubenest/tasks/service.go b/pkg/kubenest/tasks/service.go index eb10e3a79..6fb6a99ea 100644 --- a/pkg/kubenest/tasks/service.go +++ b/pkg/kubenest/tasks/service.go @@ -3,10 +3,10 @@ package tasks import ( "errors" "fmt" - "github.com/kosmos.io/kosmos/pkg/kubenest/constants" "k8s.io/klog/v2" + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" "github.com/kosmos.io/kosmos/pkg/kubenest/controlplane" "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" ) diff --git a/pkg/kubenest/util/cert/certs.go b/pkg/kubenest/util/cert/certs.go index 9c6353657..9aa7f40a3 100644 --- a/pkg/kubenest/util/cert/certs.go +++ b/pkg/kubenest/util/cert/certs.go @@ -195,6 +195,7 @@ func apiServerAltNamesMutator(cfg *AltNamesMutatorConfig) (*certutil.AltNames, e "kubernetes", "kubernetes.default", "kubernetes.default.svc", + "konnectivity-server.kube-system.svc.cluster.local", fmt.Sprintf("*.%s.svc.cluster.local", constants.VirtualClusterSystemNamespace), fmt.Sprintf("*.%s.svc", constants.VirtualClusterSystemNamespace), }, diff --git a/pkg/kubenest/util/helper.go b/pkg/kubenest/util/helper.go index 78f4540a3..fc9c3da72 100644 --- a/pkg/kubenest/util/helper.go +++ b/pkg/kubenest/util/helper.go @@ -1,18 +1,27 @@ package util import ( + "bufio" + "bytes" "context" + "fmt" + "io" + "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + utilyaml "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "sigs.k8s.io/yaml" ) func CreateOrUpdateDeployment(client clientset.Interface, deployment *appsv1.Deployment) error { @@ -199,6 +208,45 @@ func CreateObject(dynamicClient dynamic.Interface, namespace string, name string return nil } +func ApplyObject(dynamicClient dynamic.Interface, obj *unstructured.Unstructured) error { + gvk := obj.GroupVersionKind() + gvr, _ := meta.UnsafeGuessKindToResource(gvk) + namespace := obj.GetNamespace() + name := obj.GetName() + + klog.V(2).Infof("Apply %s, name: %s, namespace: %s", gvr.String(), name, namespace) + + resourceClient := dynamicClient.Resource(gvr).Namespace(namespace) + + // Get the existing resource + existingObj, err := resourceClient.Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + if kerrors.IsNotFound(err) { + // If not found, create the resource + _, err = resourceClient.Create(context.TODO(), obj, metav1.CreateOptions{}) + if err != nil { + return err + } + klog.V(2).Infof("Created %s %s in namespace %s", gvr.String(), name, namespace) + return nil + } + return err + } + + // If found, apply changes using Server-Side Apply + obj.SetResourceVersion(existingObj.GetResourceVersion()) + _, err = resourceClient.Apply(context.TODO(), name, obj, metav1.ApplyOptions{ + FieldManager: "vc-operator-manager", + Force: true, + }) + if err != nil { + return fmt.Errorf("failed to apply changes to %s %s: %v", gvr.String(), name, err) + } + + klog.V(2).Infof("Applied changes to %s %s in namespace %s", gvr.String(), name, namespace) + return nil +} + func DeleteObject(dynamicClient dynamic.Interface, namespace string, name string, obj *unstructured.Unstructured) error { gvk := obj.GroupVersionKind() gvr, _ := meta.UnsafeGuessKindToResource(gvk) @@ -214,3 +262,102 @@ func DeleteObject(dynamicClient dynamic.Interface, namespace string, name string } return nil } + +// DecodeYAML unmarshals a YAML document or multidoc YAML as unstructured +// objects, placing each decoded object into a channel. +// code from https://github.com/kubernetes/client-go/issues/216 +func DecodeYAML(data []byte) (<-chan *unstructured.Unstructured, <-chan error) { + var ( + chanErr = make(chan error) + chanObj = make(chan *unstructured.Unstructured) + multidocReader = utilyaml.NewYAMLReader(bufio.NewReader(bytes.NewReader(data))) + ) + + go func() { + defer close(chanErr) + defer close(chanObj) + + // Iterate over the data until Read returns io.EOF. Every successful + // read returns a complete YAML document. + for { + buf, err := multidocReader.Read() + if err != nil { + if err == io.EOF { + return + } + klog.Warningf("failed to read yaml data") + chanErr <- errors.Wrap(err, "failed to read yaml data") + return + } + + // Do not use this YAML doc if it is unkind. + var typeMeta runtime.TypeMeta + if err := yaml.Unmarshal(buf, &typeMeta); err != nil { + continue + } + if typeMeta.Kind == "" { + continue + } + + // Define the unstructured object into which the YAML document will be + // unmarshaled. + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{}, + } + + // Unmarshal the YAML document into the unstructured object. + if err := yaml.Unmarshal(buf, &obj.Object); err != nil { + klog.Warningf("failed to unmarshal yaml data") + chanErr <- errors.Wrap(err, "failed to unmarshal yaml data") + return + } + + // Place the unstructured object into the channel. + chanObj <- obj + } + }() + + return chanObj, chanErr +} + +// ForEachObjectInYAMLActionFunc is a function that is executed against each +// object found in a YAML document. +// When a non-empty namespace is provided then the object is assigned the +// namespace prior to any other actions being performed with or to the object. +type ForEachObjectInYAMLActionFunc func(context.Context, dynamic.Interface, *unstructured.Unstructured) error + +// ForEachObjectInYAML excutes actionFn for each object in the provided YAML. +// If an error is returned then no further objects are processed. +// The data may be a single YAML document or multidoc YAML. +// When a non-empty namespace is provided then all objects are assigned the +// the namespace prior to any other actions being performed with or to the +// object. +func ForEachObjectInYAML( + ctx context.Context, + dynamicClient dynamic.Interface, + data []byte, + namespace string, + actionFn ForEachObjectInYAMLActionFunc) error { + chanObj, chanErr := DecodeYAML(data) + for { + select { + case obj := <-chanObj: + if obj == nil { + return nil + } + if namespace != "" { + obj.SetNamespace(namespace) + } + klog.Infof("get object %s/%s", obj.GetNamespace(), obj.GetName()) + if err := actionFn(ctx, dynamicClient, obj); err != nil { + return err + } + case err := <-chanErr: + if err == nil { + return nil + } + klog.Errorf("DecodeYaml error %v", err) + return errors.Wrap(err, "received error while decoding yaml") + } + } +}