diff --git a/cmd/kubenest/node-agent/app/client/client.go b/cmd/kubenest/node-agent/app/client/client.go index eda4fbcca..9f3ec64f3 100644 --- a/cmd/kubenest/node-agent/app/client/client.go +++ b/cmd/kubenest/node-agent/app/client/client.go @@ -60,7 +60,7 @@ var ( } wg sync.WaitGroup - wsAddr []string // websocket client connect address list + WsAddr []string // websocket client connect address list filePath string // the server path to save upload file fileName string // local file to upload params []string // New slice to hold multiple command parameters @@ -79,7 +79,7 @@ func cmdCheckRun(cmd *cobra.Command, args []string) error { headers := http.Header{ "Authorization": {"Basic " + auth}, } - for _, addr := range wsAddr { + for _, addr := range WsAddr { wg.Add(1) go func(addr string) { defer wg.Done() @@ -106,15 +106,11 @@ func init() { // #nosec G402 dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - ClientCmd.PersistentFlags().StringSliceVarP(&wsAddr, "addr", "a", []string{}, "WebSocket address (e.g., host1:port1,host2:port2)") - err := ClientCmd.MarkPersistentFlagRequired("addr") - if err != nil { - return - } + ClientCmd.PersistentFlags().StringSliceVarP(&WsAddr, "addr", "a", []string{}, "WebSocket address (e.g., host1:port1,host2:port2)") // PreRunE check param ClientCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error { - for _, value := range wsAddr { + for _, value := range WsAddr { if _, exists := uniqueValuesMap[value]; exists { return errors.New("duplicate values are not allowed") } @@ -137,7 +133,7 @@ func init() { _ = uploadCmd.MarkFlagRequired("path") ttyCmd.Flags().StringVarP(&operation, "operation", "o", "", "Operation to perform") - err = ttyCmd.MarkFlagRequired("operation") // Ensure 'operation' flag is required for ttyCmd + err := ttyCmd.MarkFlagRequired("operation") // Ensure 'operation' flag is required for ttyCmd if err != nil { return } @@ -157,7 +153,7 @@ func cmdTtyRun(cmd *cobra.Command, args []string) error { } cmdStr := fmt.Sprintf("command=%s", operation) // execute one every wsAddr - for _, addr := range wsAddr { + for _, addr := range WsAddr { wsURL := fmt.Sprintf("wss://%s/tty/?%s", addr, cmdStr) fmt.Println("Executing tty:", cmdStr, "on", addr) err := connectTty(wsURL, headers) @@ -294,7 +290,7 @@ func executeWebSocketCommand(auth string) error { } // execute one every wsAddr - for _, addr := range wsAddr { + for _, addr := range WsAddr { wg.Add(1) go func(addr string) { defer wg.Done() @@ -314,7 +310,7 @@ func uploadFile(filePath, fileName, auth string) error { headers := http.Header{ "Authorization": {"Basic " + auth}, } - for _, addr := range wsAddr { + for _, addr := range WsAddr { wg.Add(1) go func(addr string) { defer wg.Done() diff --git a/cmd/kubenest/node-agent/app/root.go b/cmd/kubenest/node-agent/app/root.go index 69c6dc648..aeb9d2d59 100644 --- a/cmd/kubenest/node-agent/app/root.go +++ b/cmd/kubenest/node-agent/app/root.go @@ -2,6 +2,7 @@ package app import ( "os" + "strings" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -33,6 +34,7 @@ func initConfig() { currentDir, _ := os.Getwd() viper.AddConfigPath(currentDir) viper.AddConfigPath("/srv/node-agent/agent.env") + viper.SetConfigType("toml") // If a agent.env file is found, read it in. if err := viper.ReadInConfig(); err != nil { log.Warnf("Load config file error, %s", err) @@ -46,8 +48,27 @@ func initConfig() { } } +func initWebSocketAddr() { + err := viper.BindPFlag("ADDR", client.ClientCmd.PersistentFlags().Lookup("addr")) + if err != nil { + log.Fatalf("Failed to bind flag: %v", err) + return + } + err = viper.BindEnv("ADDR", "ADDR") + if err != nil { + log.Fatalf("Failed to bind env: %v", err) + return + } + // Initialize addr value from viper + log.Infof(strings.Join(viper.AllKeys(), ",")) + if viper.Get("addr") != nil { + client.WsAddr = viper.GetStringSlice("addr") + log.Infof("addr: %v", client.WsAddr) + } +} + func init() { - cobra.OnInitialize(initConfig) + cobra.OnInitialize(initConfig, initWebSocketAddr) RootCmd.PersistentFlags().StringVarP(&user, "user", "u", "", "Username for authentication") RootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "Password for authentication") diff --git a/deploy/crds/kosmos.io_virtualclusters.yaml b/deploy/crds/kosmos.io_virtualclusters.yaml index f5912ba06..094ff1179 100644 --- a/deploy/crds/kosmos.io_virtualclusters.yaml +++ b/deploy/crds/kosmos.io_virtualclusters.yaml @@ -53,6 +53,12 @@ spec: items: type: string type: array + externalVips: + description: ExternalVips is the external vips of the virtual kubernetes's + control plane + items: + type: string + type: array kubeconfig: description: Kubeconfig is the kubeconfig of the virtual kubernetes's control plane @@ -208,6 +214,10 @@ spec: updateTime: format: date-time type: string + vipMap: + additionalProperties: + type: string + type: object type: object required: - spec diff --git a/deploy/virtual-cluster-components-manifest-cm.yaml b/deploy/virtual-cluster-components-manifest-cm.yaml index 449040b5a..60f69a752 100644 --- a/deploy/virtual-cluster-components-manifest-cm.yaml +++ b/deploy/virtual-cluster-components-manifest-cm.yaml @@ -4,6 +4,7 @@ data: [ {"name": "kube-proxy", "path": "/kosmos/manifest/kube-proxy/*.yaml"}, {"name": "calico", "path": "/kosmos/manifest/calico/*.yaml"}, + {"name": "keepalived", "path": "/kosmos/manifest/keepalived/*.yaml"}, ] host-core-dns-components: | [ diff --git a/deploy/virtual-cluster-vip-pool-cm.yaml b/deploy/virtual-cluster-vip-pool-cm.yaml new file mode 100644 index 000000000..940c63ab5 --- /dev/null +++ b/deploy/virtual-cluster-vip-pool-cm.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: kosmos-vip-pool + namespace: kosmos-system +data: + vip-config.yaml: | + # can be use for vc, the ip formate is 192.168.0.1 and 192.168.0.2-192.168.0.10 + vipPool: + - 192.168.0.1-192.168.0.10 \ No newline at end of file diff --git a/hack/node-agent/init.sh b/hack/node-agent/init.sh index 3a0609861..cc522968e 100644 --- a/hack/node-agent/init.sh +++ b/hack/node-agent/init.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash -WEB_USER="$WEB_USER" sed -i 's/^WEB_USER=.*/WEB_USER='"$WEB_USER"'/' /app/agent.env -WEB_PASS="$WEB_PASS" sed -i 's/^WEB_PASS=.*/WEB_PASS='"$WEB_PASS"'/' /app/agent.env +WEB_USER="$WEB_USER" sed -i 's/^WEB_USER=.*/WEB_USER="'"$WEB_USER"'"/' /app/agent.env +WEB_PASS="$WEB_PASS" sed -i 's/^WEB_PASS=.*/WEB_PASS="'"$WEB_PASS"'"/' /app/agent.env sha256sum /app/node-agent > /app/node-agent.sum sha256sum /host-path/node-agent >> /app/node-agent.sum rsync -avz /app/ /host-path/ diff --git a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go index 96f252d96..e5329562e 100644 --- a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go +++ b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go @@ -56,6 +56,10 @@ type VirtualClusterSpec struct { // +optional ExternalIps []string `json:"externalIps,omitempty"` + // ExternalVips is the external vips of the virtual kubernetes's control plane + // +optional + ExternalVips []string `json:"externalVips,omitempty"` + // PromotePolicies definites the policies for promote to the kubernetes's control plane // +required PromotePolicies []PromotePolicy `json:"promotePolicies,omitempty"` @@ -139,6 +143,8 @@ type VirtualClusterStatus struct { Port int32 `json:"port,omitempty"` // +optional PortMap map[string]int32 `json:"portMap,omitempty"` + // +optional + VipMap map[string]string `json:"vipMap,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go index 1ed23db65..435074636 100644 --- a/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go @@ -1938,6 +1938,11 @@ func (in *VirtualClusterSpec) DeepCopyInto(out *VirtualClusterSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.ExternalVips != nil { + in, out := &in.ExternalVips, &out.ExternalVips + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.PromotePolicies != nil { in, out := &in.PromotePolicies, &out.PromotePolicies *out = make([]PromotePolicy, len(*in)) @@ -1979,6 +1984,13 @@ func (in *VirtualClusterStatus) DeepCopyInto(out *VirtualClusterStatus) { (*out)[key] = val } } + if in.VipMap != nil { + in, out := &in.VipMap, &out.VipMap + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } return } diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index 7d76aa557..512dc754d 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -3382,6 +3382,21 @@ func schema_pkg_apis_kosmos_v1alpha1_VirtualClusterSpec(ref common.ReferenceCall }, }, }, + "externalVips": { + SchemaProps: spec.SchemaProps{ + Description: "ExternalVips is the external vips of the virtual kubernetes's control plane", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, "promotePolicies": { SchemaProps: spec.SchemaProps{ Description: "PromotePolicies definites the policies for promote to the kubernetes's control plane", @@ -3477,6 +3492,21 @@ func schema_pkg_apis_kosmos_v1alpha1_VirtualClusterStatus(ref common.ReferenceCa }, }, }, + "vipMap": { + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + AdditionalProperties: &spec.SchemaOrBool{ + Allows: true, + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, }, }, }, diff --git a/pkg/kubenest/constants/constant.go b/pkg/kubenest/constants/constant.go index f07a03a1e..efe52302e 100644 --- a/pkg/kubenest/constants/constant.go +++ b/pkg/kubenest/constants/constant.go @@ -105,6 +105,18 @@ const ( ApiServerNetworkProxyAdminPortKey = "apiserver-network-proxy-admin-port" VirtualClusterPortNum = 5 + // vip + VipPoolConfigMapName = "kosmos-vip-pool" + VipPoolKey = "vip-config.yaml" + VcVipStatusKey = "vip-key" + VipKeepAlivedNodeLabelKey = "kosmos.io/keepalived-node" + VipKeepAlivedNodeLabelValue = "true" + VipKeepAlivedNodeRoleKey = "kosmos.io/keepalived-role" + VipKeepAlivedNodeRoleMaster = "master" + VipKeepalivedNodeRoleBackup = "backup" + VipKeepAlivedReplicas = 3 + VipKeepalivedComponentName = "keepalived" + ManifestComponentsConfigMap = "components-manifest-cm" WaitAllPodsRunningTimeoutSeconds = 1800 diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index 92000d5cd..9f75756b3 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -4,6 +4,7 @@ import ( "context" "encoding/base64" "fmt" + "github.com/kosmos.io/kosmos/pkg/kubenest/tasks" "sort" "sync" "time" @@ -39,6 +40,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/exector" "github.com/kosmos.io/kosmos/pkg/kubenest/util" apiclient "github.com/kosmos.io/kosmos/pkg/kubenest/util/api-client" + cs "k8s.io/client-go/kubernetes" ) type VirtualClusterInitController struct { @@ -62,6 +64,10 @@ type HostPortPool struct { PortsPool []int32 `yaml:"portsPool"` } +type VipPool struct { + Vips []string `yaml:"vipPool"` +} + const ( VirtualClusterControllerFinalizer = "kosmos.io/virtualcluster-controller" RequeueTime = 10 * time.Second @@ -142,6 +148,23 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } case v1alpha1.AllNodeReady: + name, namespace := request.Name, request.Namespace + // check if the vc enable vip + if len(originalCluster.Status.VipMap) > 0 { + // label node for keepalived + vcClient, err := tasks.GetVcClientset(c.RootClientSet, name, namespace) + if err != nil { + klog.Errorf("Get vc client failed. err: %s", err.Error()) + return reconcile.Result{}, errors.Wrapf(err, "Get vc client failed. err: %s", err.Error()) + } + reps, err := c.labelNode(vcClient) + if err != nil { + klog.Errorf("Label node for keepalived failed. err: %s", err.Error()) + return reconcile.Result{}, errors.Wrapf(err, "Label node for keepalived failed. err: %s", err.Error()) + } + klog.V(2).Infof("Label %d node for keepalived", reps) + } + err := c.ensureAllPodsRunning(updatedCluster, constants.WaitAllPodsRunningTimeoutSeconds*time.Second) if err != nil { klog.Errorf("Check all pods running err: %s", err.Error()) @@ -279,6 +302,16 @@ func (c *VirtualClusterInitController) createVirtualCluster(virtualCluster *v1al if err != nil { return errors.Wrap(err, "Error in assign host port!") } + // check if enable vip + vipPool, err := GetVipFromConfigMap(c.RootClientSet, constants.KosmosNs, constants.VipPoolConfigMapName, constants.VipPoolKey) + if err == nil && vipPool != nil && len(vipPool.Vips) > 0 { + klog.V(2).Infof("Enable vip for virtual cluster %s", virtualCluster.Name) + //Allocate vip + err = c.AllocateVip(virtualCluster, vipPool) + if err != nil { + return errors.Wrap(err, "Error in allocate vip!") + } + } executer, err := NewExecutor(virtualCluster, c.Client, c.Config, kubeNestOptions) if err != nil { @@ -621,6 +654,15 @@ func (c *VirtualClusterInitController) ensureAllPodsRunning(virtualCluster *v1al return nil } +func mapContains(big map[string]string, small map[string]string) bool { + for k, v := range small { + if bigV, ok := big[k]; !ok || bigV != v { + return false + } + } + return true +} + func GetHostPortPoolFromConfigMap(client kubernetes.Interface, ns, cmName, dataKey string) (*HostPortPool, error) { hostPorts, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), cmName, metav1.GetOptions{}) if err != nil { @@ -640,6 +682,25 @@ func GetHostPortPoolFromConfigMap(client kubernetes.Interface, ns, cmName, dataK return &hostPool, nil } +func GetVipFromConfigMap(client kubernetes.Interface, ns, cmName, key string) (*VipPool, error) { + vipPoolCm, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), cmName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + yamlData, exist := vipPoolCm.Data[key] + if !exist { + return nil, fmt.Errorf("key '%s' not found in vip pool ConfigMap '%s'", key, cmName) + } + + var vipPool VipPool + if err := yaml.Unmarshal([]byte(yamlData), &vipPool); err != nil { + return nil, err + } + + return &vipPool, nil +} + // Return false to indicate that the port is not occupied func (c *VirtualClusterInitController) isPortAllocated(port int32, hostAddress []string) bool { vcList := &v1alpha1.VirtualClusterList{} @@ -754,6 +815,7 @@ func (c *VirtualClusterInitController) AllocateHostPort(virtualCluster *v1alpha1 } hostPool, err := GetHostPortPoolFromConfigMap(c.RootClientSet, constants.KosmosNs, constants.HostPortsCMName, constants.HostPortsCMDataName) if err != nil { + klog.Errorf("get host port pool error: %v", err) return 0, err } @@ -775,6 +837,7 @@ func (c *VirtualClusterInitController) AllocateHostPort(virtualCluster *v1alpha1 return ports }() if len(ports) < constants.VirtualClusterPortNum { + klog.Errorf("no available ports to allocate") return 0, fmt.Errorf("no available ports to allocate") } virtualCluster.Status.PortMap = make(map[string]int32) @@ -788,3 +851,97 @@ func (c *VirtualClusterInitController) AllocateHostPort(virtualCluster *v1alpha1 return 0, err } + +// AllocateVip allocate vip for virtual cluster +// #nosec G602 +func (c *VirtualClusterInitController) AllocateVip(virtualCluster *v1alpha1.VirtualCluster, vipPool *VipPool) error { + c.lock.Lock() + defer c.lock.Unlock() + if len(virtualCluster.Status.VipMap) > 0 { + return nil + } + klog.V(4).InfoS("get vip pool", "vipPool", vipPool) + // check if specified vip is available + if len(virtualCluster.Spec.ExternalVips) > 0 { + if ip, err := util.IsIPAvailable(virtualCluster.Spec.ExternalVips, vipPool.Vips); err != nil { + klog.Errorf("check if specified vip is available error: %v", err) + return err + } else { + klog.V(4).InfoS("specified vip is available", "vip", ip) + virtualCluster.Status.VipMap = make(map[string]string) + virtualCluster.Status.VipMap[constants.VcVipStatusKey] = ip + return nil + } + } + vcList := &v1alpha1.VirtualClusterList{} + err := c.List(context.Background(), vcList) + if err != nil { + klog.Errorf("list virtual cluster error: %v", err) + return err + } + var allocatedVips []string + for _, vc := range vcList.Items { + for _, val := range vc.Status.VipMap { + allocatedVips = append(allocatedVips, val) + } + } + + vip, err := util.FindAvailableIP(vipPool.Vips, allocatedVips) + if err != nil { + klog.Errorf("find available vip error: %v", err) + return err + } + virtualCluster.Status.VipMap = make(map[string]string) + virtualCluster.Status.VipMap[constants.VcVipStatusKey] = vip + + return err +} + +func (c *VirtualClusterInitController) labelNode(client cs.Interface) (reps int, err error) { + replicas := constants.VipKeepAlivedReplicas + nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return 0, fmt.Errorf("failed to list nodes, err: %w", err) + } + if len(nodes.Items) == 0 { + return 0, fmt.Errorf("no nodes found") + } + reps = replicas + // select replicas nodes + if replicas > len(nodes.Items) { + reps = len(nodes.Items) + } + randomIndex, err := util.SecureRandomInt(reps) + if err != nil { + klog.Errorf("failed to get random index for master node, err: %v", err) + return 0, err + } + // sub reps as nodes + subNodes := nodes.Items[:reps] + masterNode := nodes.Items[randomIndex] + + // label node + for _, node := range subNodes { + currentNode := node + labels := currentNode.GetLabels() + if currentNode.Name == masterNode.Name { + // label master + labels[constants.VipKeepAlivedNodeRoleKey] = constants.VipKeepAlivedNodeRoleMaster + } else { + // label backup + labels[constants.VipKeepAlivedNodeRoleKey] = constants.VipKeepalivedNodeRoleBackup + } + labels[constants.VipKeepAlivedNodeLabelKey] = constants.VipKeepAlivedNodeLabelValue + + // update label + currentNode.SetLabels(labels) + _, err := client.CoreV1().Nodes().Update(context.TODO(), ¤tNode, metav1.UpdateOptions{}) + if err != nil { + klog.V(2).Infof("Failed to update labels for node %s: %v", currentNode.Name, err) + return 0, err + } + klog.V(2).Infof("Successfully updated labels for node %s", currentNode.Name) + } + klog.V(2).InfoS("[vip] Successfully label all node") + return reps, nil +} diff --git a/pkg/kubenest/init.go b/pkg/kubenest/init.go index 386bacd86..a6111e80a 100644 --- a/pkg/kubenest/init.go +++ b/pkg/kubenest/init.go @@ -35,6 +35,7 @@ type initData struct { privateRegistry string externalIP string externalIps []string + vipMap map[string]string hostPort int32 hostPortMap map[string]int32 kubeNestOptions *v1alpha1.KubeNestConfiguration @@ -67,7 +68,7 @@ func NewInitPhase(opts *InitOptions) *workflow.Phase { initPhase.AppendTask(tasks.NewCheckControlPlaneTask()) initPhase.AppendTask(tasks.NewAnpTask()) // create proxy - initPhase.AppendTask(tasks.NewVirtualClusterProxyTask()) + //initPhase.AppendTask(tasks.NewVirtualClusterProxyTask()) // create core-dns initPhase.AppendTask(tasks.NewCoreDNSTask()) // add server @@ -188,8 +189,10 @@ func newRunData(opt *InitOptions) (*initData, error) { privateRegistry: utils.DefaultImageRepository, CertStore: cert.NewCertStore(), externalIP: opt.virtualCluster.Spec.ExternalIP, + externalIps: opt.virtualCluster.Spec.ExternalIps, hostPort: opt.virtualCluster.Status.Port, hostPortMap: opt.virtualCluster.Status.PortMap, + vipMap: opt.virtualCluster.Status.VipMap, kubeNestOptions: opt.KubeNestOptions, virtualCluster: opt.virtualCluster, ETCDUnitSize: opt.KubeNestOptions.KubeInKubeConfig.ETCDUnitSize, @@ -255,6 +258,9 @@ func (i initData) ExternalIP() string { func (i initData) ExternalIPs() []string { return i.externalIps } +func (i initData) VipMap() map[string]string { + return i.vipMap +} func (i initData) HostPort() int32 { return i.hostPort } diff --git a/pkg/kubenest/tasks/anp.go b/pkg/kubenest/tasks/anp.go index 60eee0313..50d4bb73e 100644 --- a/pkg/kubenest/tasks/anp.go +++ b/pkg/kubenest/tasks/anp.go @@ -311,7 +311,7 @@ func getVcDynamicClient(client clientset.Interface, name, namespace string) (dyn } return dynamicClient, nil } -func getVcClientset(client clientset.Interface, name, namespace string) (clientset.Interface, error) { +func GetVcClientset(client clientset.Interface, name, namespace string) (clientset.Interface, error) { secret, err := client.CoreV1().Secrets(namespace).Get(context.TODO(), fmt.Sprintf("%s-%s", name, constants.AdminConfig), metav1.GetOptions{}) if err != nil { @@ -346,7 +346,7 @@ func runUploadProxyAgentCert(r workflow.RunData) error { certsData[c.CertName()] = c.CertData() } } - vcClient, err := getVcClientset(data.RemoteClient(), name, namespace) + vcClient, err := GetVcClientset(data.RemoteClient(), name, namespace) if err != nil { return fmt.Errorf("failed to get virtual cluster client, err: %w", err) } diff --git a/pkg/kubenest/tasks/cert.go b/pkg/kubenest/tasks/cert.go index 28379a08a..f674c2fc1 100644 --- a/pkg/kubenest/tasks/cert.go +++ b/pkg/kubenest/tasks/cert.go @@ -136,6 +136,7 @@ func mutateCertConfig(data InitData, cc *cert.CertConfig) error { ClusterIps: data.ServiceClusterIp(), ExternalIP: data.ExternalIP(), ExternalIPs: data.ExternalIPs(), + VipMap: data.VipMap(), }, cc) if err != nil { return err diff --git a/pkg/kubenest/tasks/data.go b/pkg/kubenest/tasks/data.go index 54732ed0f..4093d2285 100644 --- a/pkg/kubenest/tasks/data.go +++ b/pkg/kubenest/tasks/data.go @@ -20,10 +20,11 @@ type InitData interface { DataDir() string VirtualCluster() *v1alpha1.VirtualCluster ExternalIP() string + ExternalIPs() []string HostPort() int32 HostPortMap() map[string]int32 + VipMap() map[string]string DynamicClient() *dynamic.DynamicClient KubeNestOpt() *v1alpha1.KubeNestConfiguration - ExternalIPs() []string PluginOptions() map[string]string } diff --git a/pkg/kubenest/tasks/manifests_components.go b/pkg/kubenest/tasks/manifests_components.go index a68cf846f..1cdcd70ac 100644 --- a/pkg/kubenest/tasks/manifests_components.go +++ b/pkg/kubenest/tasks/manifests_components.go @@ -59,7 +59,7 @@ func applyComponentsManifests(r workflow.RunData) error { if !ok { return errors.New("Virtual cluster manifests-components task invoked with an invalid data struct") } - + keepalivedReplicas := constants.VipKeepAlivedReplicas secret, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(), fmt.Sprintf("%s-%s", data.GetName(), constants.AdminConfig), metav1.GetOptions{}) if err != nil { @@ -83,13 +83,26 @@ func applyComponentsManifests(r workflow.RunData) error { templatedMapping["KUBE_PROXY_KUBECONFIG"] = string(secret.Data[constants.KubeConfig]) imageRepository, _ := util.GetImageMessage() templatedMapping["ImageRepository"] = imageRepository - - for k, v := range data.PluginOptions() { - templatedMapping[k] = v + keepalivedEnable := data.VipMap() != nil && data.VipMap()[constants.VcVipStatusKey] != "" + if keepalivedEnable { + templatedMapping["Vip"] = data.VipMap()[constants.VcVipStatusKey] + // use min replicas + nodeCount := data.VirtualCluster().Spec.PromotePolicies[0].NodeCount + if nodeCount < constants.VipKeepAlivedReplicas { + keepalivedReplicas = int(nodeCount) + } + for k, v := range data.PluginOptions() { + templatedMapping[k] = v + } + templatedMapping["KeepalivedReplicas"] = keepalivedReplicas } for _, component := range components { klog.V(2).Infof("Deploy component %s", component.Name) + // skip keepalived component if vip is not enabled + if !keepalivedEnable && component.Name == constants.VipKeepalivedComponentName { + continue + } err = applyTemplatedManifests(component.Name, dynamicClient, component.Path, templatedMapping) if err != nil { return err diff --git a/pkg/kubenest/util/cert/certs.go b/pkg/kubenest/util/cert/certs.go index dd608c004..5718662f4 100644 --- a/pkg/kubenest/util/cert/certs.go +++ b/pkg/kubenest/util/cert/certs.go @@ -44,6 +44,7 @@ type AltNamesMutatorConfig struct { ClusterIps []string ExternalIP string ExternalIPs []string + VipMap map[string]string } func (config *CertConfig) defaultPublicKeyAlgorithm() { @@ -288,6 +289,11 @@ func apiServerAltNamesMutator(cfg *AltNamesMutatorConfig) (*certutil.AltNames, e } } + if len(cfg.VipMap) > 0 { + for _, vip := range cfg.VipMap { + appendSANsToAltNames(altNames, []string{vip}) + } + } if len(cfg.ClusterIps) > 0 { for _, clusterIp := range cfg.ClusterIps { appendSANsToAltNames(altNames, []string{clusterIp}) diff --git a/pkg/kubenest/util/util.go b/pkg/kubenest/util/util.go index 49bb084d2..34457f3a5 100644 --- a/pkg/kubenest/util/util.go +++ b/pkg/kubenest/util/util.go @@ -1,8 +1,10 @@ package util import ( + "crypto/rand" "encoding/base64" "fmt" + "math/big" "net" "strings" @@ -89,6 +91,121 @@ func IPV6First(ipNetStr string) (bool, error) { return utils.IsIPv6(ipNetStrArray[0]), nil } +// parseCIDR returns a channel that generates IP addresses in the CIDR range. +func parseCIDR(cidr string) (chan string, error) { + ip, ipnet, err := net.ParseCIDR(cidr) + if err != nil { + return nil, err + } + ch := make(chan string) + go func() { + for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { + ch <- ip.String() + } + close(ch) + }() + return ch, nil +} + +// inc increments an IP address. +func inc(ip net.IP) { + for j := len(ip) - 1; j >= 0; j-- { + ip[j]++ + if ip[j] > 0 { + break + } + } +} + +// parseRange returns a channel that generates IP addresses in the range. +func parseRange(ipRange string) (chan string, error) { + parts := strings.Split(ipRange, "-") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid IP range format: %s", ipRange) + } + startIP := net.ParseIP(parts[0]) + endIP := net.ParseIP(parts[1]) + if startIP == nil || endIP == nil { + return nil, fmt.Errorf("invalid IP address in range: %s", ipRange) + } + + ch := make(chan string) + go func() { + for ip := startIP; !ip.Equal(endIP); inc(ip) { + ch <- ip.String() + } + ch <- endIP.String() + close(ch) + }() + return ch, nil +} + +// ParseVIPPool returns a channel that generates IP addresses from the vipPool. +func parseVIPPool(vipPool []string) (chan string, error) { + ch := make(chan string) + go func() { + defer close(ch) + for _, entry := range vipPool { + entry = strings.TrimSpace(entry) + var ipCh chan string + var err error + if strings.Contains(entry, "/") { + ipCh, err = parseCIDR(entry) + } else if strings.Contains(entry, "-") { + ipCh, err = parseRange(entry) + } else { + ip := net.ParseIP(entry) + if ip == nil { + err = fmt.Errorf("invalid IP address: %s", entry) + } else { + ipCh = make(chan string, 1) + ipCh <- entry + close(ipCh) + } + } + if err != nil { + fmt.Println("Error:", err) + return + } + for ip := range ipCh { + ch <- ip + } + } + }() + return ch, nil +} + +// FindAvailableIP finds an available IP address from vipPool that is not in allocatedVips. +func FindAvailableIP(vipPool, allocatedVips []string) (string, error) { + allocatedSet := make(map[string]struct{}) + for _, ip := range allocatedVips { + allocatedSet[ip] = struct{}{} + } + + ipCh, err := parseVIPPool(vipPool) + if err != nil { + return "", err + } + + for ip := range ipCh { + if _, allocated := allocatedSet[ip]; !allocated { + return ip, nil + } + } + + return "", fmt.Errorf("no available IP addresses") +} + +// Seed the random number generator using crypto/rand +func SecureRandomInt(n int) (int, error) { + bigN := big.NewInt(int64(n)) + randInt, err := rand.Int(rand.Reader, bigN) + if err != nil { + return 0, err + } + return int(randInt.Int64()), nil +} + func MapContains(big map[string]string, small map[string]string) bool { for k, v := range small { if bigV, ok := big[k]; !ok || bigV != v { @@ -97,3 +214,78 @@ func MapContains(big map[string]string, small map[string]string) bool { } return true } + +func IsIPAvailable(ips, vipPool []string) (string, error) { + for _, ip := range ips { + if b, err := IsIPInRange(ip, vipPool); b && err == nil { + return ip, nil + } + } + return "", fmt.Errorf("specified IP not available in the VIP pool") +} + +// IsIPInRange checks if the given IP is in any of the provided IP ranges +func IsIPInRange(ipStr string, ranges []string) (bool, error) { + ip := net.ParseIP(ipStr) + if ip == nil { + return false, fmt.Errorf("invalid IP address: %s", ipStr) + } + + for _, r := range ranges { + if strings.Contains(r, "/") { + // Handle CIDR notation + _, ipNet, err := net.ParseCIDR(r) + if err != nil { + return false, fmt.Errorf("invalid CIDR notation: %s", r) + } + if ipNet.Contains(ip) { + return true, nil + } + } else if strings.Contains(r, "-") { + // Handle IP range notation + ips := strings.Split(r, "-") + if len(ips) != 2 { + return false, fmt.Errorf("invalid range notation: %s", r) + } + startIP := net.ParseIP(strings.TrimSpace(ips[0])) + endIP := net.ParseIP(strings.TrimSpace(ips[1])) + if startIP == nil || endIP == nil { + return false, fmt.Errorf("invalid IP range: %s", r) + } + if compareIPs(ip, startIP) >= 0 && compareIPs(ip, endIP) <= 0 { + return true, nil + } + } else { + return false, fmt.Errorf("invalid IP range or CIDR format: %s", r) + } + } + + return false, nil +} + +// compareIPs compares two IP addresses, returns -1 if ip1 < ip2, 1 if ip1 > ip2, and 0 if they are equal +func compareIPs(ip1, ip2 net.IP) int { + if ip1.To4() != nil && ip2.To4() != nil { + return compareBytes(ip1.To4(), ip2.To4()) + } + return compareBytes(ip1, ip2) +} + +// compareBytes compares two byte slices, returns -1 if a < b, 1 if a > b, and 0 if they are equal +func compareBytes(a, b []byte) int { + for i := 0; i < len(a) && i < len(b); i++ { + if a[i] < b[i] { + return -1 + } + if a[i] > b[i] { + return 1 + } + } + if len(a) < len(b) { + return -1 + } + if len(a) > len(b) { + return 1 + } + return 0 +} diff --git a/pkg/kubenest/util/util_test.go b/pkg/kubenest/util/util_test.go new file mode 100644 index 000000000..403785a25 --- /dev/null +++ b/pkg/kubenest/util/util_test.go @@ -0,0 +1,106 @@ +package util + +import ( + "fmt" + "testing" + + "gopkg.in/yaml.v3" +) + +func TestFindAvailableIP(t *testing.T) { + type args struct { + vipPool []string + allocatedVips []string + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "test1", + args: args{ + vipPool: []string{"192.168.0.1", "192.168.0.2", "192.168.0.3"}, + allocatedVips: []string{"192.168.0.1", "192.168.0.2"}, + }, + want: "192.168.0.3", + wantErr: false, + }, + { + name: "test2", + args: args{ + vipPool: []string{ + "192.168.0.1", + "192.168.0.2-192.168.0.10", + "192.168.1.0/24", + "2001:db8::1", + "2001:db8::1-2001:db8::10", + "2001:db8::/64", + }, + allocatedVips: []string{"192.168.0.1", "192.168.0.2"}, + }, + want: "192.168.0.3", + wantErr: false, + }, + { + name: "test3", + args: args{ + vipPool: []string{ + "192.168.6.110-192.168.6.120", + }, + allocatedVips: []string{}, + }, + want: "192.168.6.110", + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := FindAvailableIP(tt.args.vipPool, tt.args.allocatedVips) + fmt.Printf("got vip : %v", got) + if (err != nil) != tt.wantErr { + t.Errorf("FindAvailableIP() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("FindAvailableIP() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestFindAvailableIP2(t *testing.T) { + type HostPortPool struct { + PortsPool []int32 `yaml:"portsPool"` + } + type VipPool struct { + Vip []string `yaml:"vipPool"` + } + var vipPool VipPool + var hostPortPool HostPortPool + yamlData2 := ` +portsPool: + - 33001 + - 33002 + - 33003 + - 33004 + - 33005 + - 33006 + - 33007 + - 33008 + - 33009 + - 33010 +` + yamlData := ` +vipPool: + - 192.168.6.110-192.168.6.120 +` + if err := yaml.Unmarshal([]byte(yamlData), &vipPool); err != nil { + panic(err) + } + if err := yaml.Unmarshal([]byte(yamlData2), &hostPortPool); err != nil { + panic(err) + } + fmt.Printf("vipPool: %v", vipPool) +} diff --git a/vendor/github.com/pelletier/go-toml/example-crlf.toml b/vendor/github.com/pelletier/go-toml/example-crlf.toml index 780d9c68f..f45bf88b8 100644 --- a/vendor/github.com/pelletier/go-toml/example-crlf.toml +++ b/vendor/github.com/pelletier/go-toml/example-crlf.toml @@ -1,30 +1,30 @@ -# This is a TOML document. Boom. - -title = "TOML Example" - -[owner] -name = "Tom Preston-Werner" -organization = "GitHub" -bio = "GitHub Cofounder & CEO\nLikes tater tots and beer." -dob = 1979-05-27T07:32:00Z # First class dates? Why not? - -[database] -server = "192.168.1.1" -ports = [ 8001, 8001, 8002 ] -connection_max = 5000 -enabled = true - -[servers] - - # You can indent as you please. Tabs or spaces. TOML don't care. - [servers.alpha] - ip = "10.0.0.1" - dc = "eqdc10" - - [servers.beta] - ip = "10.0.0.2" - dc = "eqdc10" - -[clients] -data = [ ["gamma", "delta"], [1, 2] ] # just an update to make sure parsers support it +# This is a TOML document. Boom. + +title = "TOML Example" + +[owner] +name = "Tom Preston-Werner" +organization = "GitHub" +bio = "GitHub Cofounder & CEO\nLikes tater tots and beer." +dob = 1979-05-27T07:32:00Z # First class dates? Why not? + +[database] +server = "192.168.1.1" +ports = [ 8001, 8001, 8002 ] +connection_max = 5000 +enabled = true + +[servers] + + # You can indent as you please. Tabs or spaces. TOML don't care. + [servers.alpha] + ip = "10.0.0.1" + dc = "eqdc10" + + [servers.beta] + ip = "10.0.0.2" + dc = "eqdc10" + +[clients] +data = [ ["gamma", "delta"], [1, 2] ] # just an update to make sure parsers support it score = 4e-08 # to make sure leading zeroes in exponent parts of floats are supported \ No newline at end of file