From ad69d5ec2db807378c18e1c2e2802fa3417484c1 Mon Sep 17 00:00:00 2001 From: GreatLazyMan Date: Mon, 18 Dec 2023 10:24:33 +0800 Subject: [PATCH] title: Add ipsec tunnel mode to support cross clusters and elastic ip Signed-off-by: GreatLazyMan --- cluster/images/Dockerfile | 2 +- cmd/clusterlink/elector/app/elector.go | 7 +- deploy/crds/kosmos.io_clusternodes.yaml | 8 +- deploy/crds/kosmos.io_clusters.yaml | 12 + deploy/crds/kosmos.io_nodeconfigs.yaml | 58 ++ pkg/apis/kosmos/v1alpha1/cluster_types.go | 8 + pkg/apis/kosmos/v1alpha1/clusternode_types.go | 5 +- pkg/apis/kosmos/v1alpha1/constants.go | 14 + pkg/apis/kosmos/v1alpha1/nodeconfig_types.go | 57 +- .../network-manager/network_manager.go | 72 ++- pkg/clusterlink/constants.go | 16 + .../calicoippool/calicoippool_controller.go | 2 +- .../controllers/cluster/cluster_controller.go | 93 ++- .../controllers/node/node_controller.go | 25 +- pkg/clusterlink/elector/elector.go | 67 ++- .../network-manager/handlers/cni_support.go | 38 ++ .../network-manager/handlers/nodeconfig.go | 29 +- .../network-manager/handlers/pod_routes.go | 93 ++- .../network-manager/helpers/filter.go | 18 + .../network-manager/helpers/sort.go | 51 ++ .../network-manager/network_manager.go | 3 +- pkg/clusterlink/network/adapter.go | 158 ++++++ pkg/clusterlink/network/avoidmasq.go | 122 ++++ pkg/clusterlink/network/interface.go | 9 + pkg/clusterlink/network/iptables/iptables.go | 31 + pkg/clusterlink/network/xfrm_policy.go | 180 ++++++ pkg/kosmosctl/install/install.go | 116 ++-- pkg/kosmosctl/join/join.go | 51 +- pkg/kosmosctl/manifest/manifest_crds.go | 83 ++- .../manifest/manifest_deployments.go | 6 +- .../k8s.io/kubernetes/pkg/util/ipset/OWNERS | 11 + .../k8s.io/kubernetes/pkg/util/ipset/ipset.go | 532 ++++++++++++++++++ .../k8s.io/kubernetes/pkg/util/ipset/types.go | 65 +++ vendor/modules.txt | 1 + 34 files changed, 1949 insertions(+), 94 deletions(-) create mode 100644 pkg/clusterlink/constants.go create mode 100644 pkg/clusterlink/network-manager/handlers/cni_support.go create mode 100644 pkg/clusterlink/network/avoidmasq.go create mode 100644 pkg/clusterlink/network/xfrm_policy.go create mode 100644 vendor/k8s.io/kubernetes/pkg/util/ipset/OWNERS create mode 100644 vendor/k8s.io/kubernetes/pkg/util/ipset/ipset.go create mode 100644 vendor/k8s.io/kubernetes/pkg/util/ipset/types.go diff --git a/cluster/images/Dockerfile b/cluster/images/Dockerfile index 6da014abe..0b5fc8a50 100644 --- a/cluster/images/Dockerfile +++ b/cluster/images/Dockerfile @@ -4,6 +4,6 @@ ARG BINARY RUN apk add --no-cache ca-certificates RUN apk update && apk upgrade -RUN apk add ip6tables iptables curl tcpdump busybox-extras +RUN apk add ip6tables iptables ipset curl tcpdump busybox-extras COPY ${BINARY} /bin/${BINARY} diff --git a/cmd/clusterlink/elector/app/elector.go b/cmd/clusterlink/elector/app/elector.go index e805615f1..334363e34 100644 --- a/cmd/clusterlink/elector/app/elector.go +++ b/cmd/clusterlink/elector/app/elector.go @@ -126,10 +126,11 @@ func run(ctx context.Context, opts *options.Options) error { err := elector.EnsureGateWayRole() if err != nil { klog.Errorf("set gateway role failure: %v, retry after 10 sec.", err) - time.Sleep(10 * time.Second) + time.Sleep(3 * time.Second) } else { - klog.V(4).Info("ensure gateway role success, recheck after 60 sec.") - time.Sleep(60 * time.Second) + timeToRecheck := 3 * time.Second + klog.V(4).Infof("ensure gateway role success, recheck after %d sec.", int(timeToRecheck)) + time.Sleep(timeToRecheck) } } } diff --git a/deploy/crds/kosmos.io_clusternodes.yaml b/deploy/crds/kosmos.io_clusternodes.yaml index fb61383c0..9d5dace88 100644 --- a/deploy/crds/kosmos.io_clusternodes.yaml +++ b/deploy/crds/kosmos.io_clusternodes.yaml @@ -45,6 +45,8 @@ spec: properties: clusterName: type: string + elasticip: + type: string interfaceName: type: string ip: @@ -63,11 +65,13 @@ spec: type: array type: object status: + properties: + nodeStatus: + type: string type: object required: - spec type: object served: true storage: true - subresources: - status: {} + subresources: {} diff --git a/deploy/crds/kosmos.io_clusters.yaml b/deploy/crds/kosmos.io_clusters.yaml index cbf674914..6b6d78424 100644 --- a/deploy/crds/kosmos.io_clusters.yaml +++ b/deploy/crds/kosmos.io_clusters.yaml @@ -58,6 +58,10 @@ spec: - ip - ip6 type: object + clusterpodCIDRs: + items: + type: string + type: array cni: default: calico type: string @@ -107,9 +111,17 @@ spec: - nodeName type: object type: array + nodeElasticIPMap: + additionalProperties: + type: string + description: NodeElasticIPMap presents mapping between nodename + in kubernetes and elasticIP + type: object useIPPool: default: false type: boolean + useexternalapiserver: + type: boolean type: object clusterTreeOptions: properties: diff --git a/deploy/crds/kosmos.io_nodeconfigs.yaml b/deploy/crds/kosmos.io_nodeconfigs.yaml index 78679263e..f865e0bb4 100644 --- a/deploy/crds/kosmos.io_nodeconfigs.yaml +++ b/deploy/crds/kosmos.io_nodeconfigs.yaml @@ -92,6 +92,18 @@ spec: - mac type: object type: array + ipsetsavoidmasq: + items: + properties: + cidr: + type: string + name: + type: string + required: + - cidr + - name + type: object + type: array iptables: items: properties: @@ -122,6 +134,52 @@ spec: - gw type: object type: array + xfrmpolicies: + items: + properties: + dir: + type: integer + leftip: + type: string + leftnet: + type: string + reqid: + type: integer + rightip: + type: string + rightnet: + type: string + required: + - dir + - leftip + - leftnet + - reqid + - rightip + - rightnet + type: object + type: array + xfrmstates: + items: + properties: + PSK: + type: string + leftip: + type: string + reqid: + type: integer + rightip: + type: string + spi: + format: int32 + type: integer + required: + - PSK + - leftip + - reqid + - rightip + - spi + type: object + type: array type: object status: properties: diff --git a/pkg/apis/kosmos/v1alpha1/cluster_types.go b/pkg/apis/kosmos/v1alpha1/cluster_types.go index 8ba7bf20c..5945477f1 100644 --- a/pkg/apis/kosmos/v1alpha1/cluster_types.go +++ b/pkg/apis/kosmos/v1alpha1/cluster_types.go @@ -94,6 +94,14 @@ type ClusterLinkOptions struct { // +optional AutodetectionMethod string `json:"autodetectionMethod,omitempty"` + + // NodeElasticIPMap presents mapping between nodename in kubernetes and elasticIP + // +optional + NodeElasticIPMap map[string]string `json:"nodeElasticIPMap,omitempty"` + // +optional + ClusterPodCIDRs []string `json:"clusterpodCIDRs,omitempty"` + // +optional + UseExternalApiserver bool `json:"useexternalapiserver,omitempty"` } type ClusterTreeOptions struct { diff --git a/pkg/apis/kosmos/v1alpha1/clusternode_types.go b/pkg/apis/kosmos/v1alpha1/clusternode_types.go index 8829d55b2..6708419d4 100644 --- a/pkg/apis/kosmos/v1alpha1/clusternode_types.go +++ b/pkg/apis/kosmos/v1alpha1/clusternode_types.go @@ -8,7 +8,6 @@ import ( // +genclient // +genclient:nonNamespaced -// +kubebuilder:subresource:status // +kubebuilder:resource:scope="Cluster" // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +kubebuilder:printcolumn:name="ROLES",type=string,JSONPath=`.spec.roles` @@ -33,6 +32,8 @@ type ClusterNodeSpec struct { // +optional IP string `json:"ip,omitempty"` // +optional + ElasticIP string `json:"elasticip,omitempty"` + // +optional IP6 string `json:"ip6,omitempty"` // +optional Roles []Role `json:"roles,omitempty"` @@ -41,6 +42,8 @@ type ClusterNodeSpec struct { } type ClusterNodeStatus struct { + // +optional + NodeStatus string `json:"nodeStatus,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/kosmos/v1alpha1/constants.go b/pkg/apis/kosmos/v1alpha1/constants.go index 56b37c550..7361a43b1 100644 --- a/pkg/apis/kosmos/v1alpha1/constants.go +++ b/pkg/apis/kosmos/v1alpha1/constants.go @@ -28,3 +28,17 @@ type DeviceType string const ( VxlanDevice DeviceType = "vxlan" ) + +const ( + DefaultPSK string = "bfd6224354977084568832b811226b3d6cff6685" + DefaultPSKPreStr = "WelcometoKosmos" + DefaultReqID int = 336 +) + +type IPSECDirection int + +const ( + IPSECIn IPSECDirection = 0 + IPSECOut IPSECDirection = 1 + IPSECFwd IPSECDirection = 2 +) diff --git a/pkg/apis/kosmos/v1alpha1/nodeconfig_types.go b/pkg/apis/kosmos/v1alpha1/nodeconfig_types.go index 1f4a77a02..46e8b4f33 100644 --- a/pkg/apis/kosmos/v1alpha1/nodeconfig_types.go +++ b/pkg/apis/kosmos/v1alpha1/nodeconfig_types.go @@ -21,11 +21,14 @@ type NodeConfig struct { } type NodeConfigSpec struct { - Devices []Device `json:"devices,omitempty"` - Routes []Route `json:"routes,omitempty"` - Iptables []Iptables `json:"iptables,omitempty"` - Fdbs []Fdb `json:"fdbs,omitempty"` - Arps []Arp `json:"arps,omitempty"` + Devices []Device `json:"devices,omitempty"` + Routes []Route `json:"routes,omitempty"` + Iptables []Iptables `json:"iptables,omitempty"` + Fdbs []Fdb `json:"fdbs,omitempty"` + Arps []Arp `json:"arps,omitempty"` + XfrmPolicies []XfrmPolicy `json:"xfrmpolicies,omitempty"` + XfrmStates []XfrmState `json:"xfrmstates,omitempty"` + IPsetsAvoidMasqs []IPset `json:"ipsetsavoidmasq,omitempty"` } type NodeConfigStatus struct { @@ -101,6 +104,50 @@ func (a *Arp) Compare(v Arp) bool { a.Dev == v.Dev } +type XfrmPolicy struct { + LeftIP string `json:"leftip"` + LeftNet string `json:"leftnet"` + RightIP string `json:"rightip"` + RightNet string `json:"rightnet"` + ReqID int `json:"reqid"` + Dir int `json:"dir"` +} + +func (a *XfrmPolicy) Compare(v XfrmPolicy) bool { + return a.LeftIP == v.LeftIP && + a.LeftNet == v.LeftNet && + a.RightNet == v.RightNet && + a.RightIP == v.RightIP && + a.ReqID == v.ReqID && + a.Dir == v.Dir +} + +type XfrmState struct { + LeftIP string `json:"leftip"` + RightIP string `json:"rightip"` + ReqID int `json:"reqid"` + SPI uint32 `json:"spi"` + PSK string `json:"PSK"` +} + +func (a *XfrmState) Compare(v XfrmState) bool { + return a.LeftIP == v.LeftIP && + a.RightIP == v.RightIP && + a.ReqID == v.ReqID && + a.PSK == v.PSK && + a.SPI == v.SPI +} + +type IPset struct { + CIDR string `json:"cidr"` + Name string `json:"name"` +} + +func (a *IPset) Compare(v IPset) bool { + return a.CIDR == v.CIDR && + a.Name == v.Name +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type NodeConfigList struct { diff --git a/pkg/clusterlink/agent-manager/network-manager/network_manager.go b/pkg/clusterlink/agent-manager/network-manager/network_manager.go index f1898fa55..36761458b 100644 --- a/pkg/clusterlink/agent-manager/network-manager/network_manager.go +++ b/pkg/clusterlink/agent-manager/network-manager/network_manager.go @@ -112,6 +112,29 @@ func (e *NetworkManager) Diff(oldConfig, newConfig *clusterlinkv1alpha1.NodeConf createConfig.Routes = createRecord isSame = false } + // ipsec: + if flag, deleteRecord, createRecord := compareFunc(oldConfig.XfrmPolicies, newConfig.XfrmPolicies, func(a, b clusterlinkv1alpha1.XfrmPolicy) bool { + return a.Compare(b) + }); !flag { + deleteConfig.XfrmPolicies = deleteRecord + createConfig.XfrmPolicies = createRecord + isSame = false + } + if flag, deleteRecord, createRecord := compareFunc(oldConfig.XfrmStates, newConfig.XfrmStates, func(a, b clusterlinkv1alpha1.XfrmState) bool { + return a.Compare(b) + }); !flag { + deleteConfig.XfrmStates = deleteRecord + createConfig.XfrmStates = createRecord + isSame = false + } + //ipset + if flag, deleteRecord, createRecord := compareFunc(oldConfig.IPsetsAvoidMasqs, newConfig.IPsetsAvoidMasqs, func(a, b clusterlinkv1alpha1.IPset) bool { + return a.Compare(b) + }); !flag { + deleteConfig.IPsetsAvoidMasqs = deleteRecord + createConfig.IPsetsAvoidMasqs = createRecord + isSame = false + } // iptables: if flag, deleteRecord, createRecord := compareFunc(oldConfig.Iptables, newConfig.Iptables, func(a, b clusterlinkv1alpha1.Iptables) bool { return a.Compare(b) @@ -188,6 +211,24 @@ func (e *NetworkManager) WriteSys(configDiff *ConfigDiff) error { errs = errors.Wrap(err, fmt.Sprint(errs)) } } + if config.XfrmPolicies != nil { + if err := e.NetworkInterface.DeleteXfrmPolicies(config.XfrmPolicies); err != nil { + klog.Warning(err) + errs = errors.Wrap(err, fmt.Sprint(errs)) + } + } + if config.XfrmStates != nil { + if err := e.NetworkInterface.DeleteXfrmStates(config.XfrmStates); err != nil { + klog.Warning(err) + errs = errors.Wrap(err, fmt.Sprint(errs)) + } + } + if config.IPsetsAvoidMasqs != nil { + if err := e.NetworkInterface.DeleteIPsetsAvoidMasq(config.IPsetsAvoidMasqs); err != nil { + klog.Warning(err) + errs = errors.Wrap(err, fmt.Sprint(errs)) + } + } } if configDiff.createConfig != nil { @@ -223,6 +264,24 @@ func (e *NetworkManager) WriteSys(configDiff *ConfigDiff) error { errs = errors.Wrap(err, fmt.Sprint(errs)) } } + if config.XfrmPolicies != nil { + if err := e.NetworkInterface.AddXfrmPolicies(config.XfrmPolicies); err != nil { + klog.Warning(err) + errs = errors.Wrap(err, fmt.Sprint(errs)) + } + } + if config.XfrmStates != nil { + if err := e.NetworkInterface.AddXfrmStates(config.XfrmStates); err != nil { + klog.Warning(err) + errs = errors.Wrap(err, fmt.Sprint(errs)) + } + } + if config.IPsetsAvoidMasqs != nil { + if err := e.NetworkInterface.AddIPsetsAvoidMasq(config.IPsetsAvoidMasqs); err != nil { + klog.Warning(err) + errs = errors.Wrap(err, fmt.Sprint(errs)) + } + } } return errs @@ -254,11 +313,14 @@ func (e *NetworkManager) UpdateFromChecker() NodeConfigSyncStatus { } func printNodeConfig(data *clusterlinkv1alpha1.NodeConfigSpec) { - klog.Infof("device: ", data.Devices) - klog.Infof("Arps: ", data.Arps) - klog.Infof("Fdbs: ", data.Fdbs) - klog.Infof("Iptables: ", data.Iptables) - klog.Infof("Routes: ", data.Routes) + klog.Infof("device: %v", data.Devices) + klog.Infof("Arps: %v", data.Arps) + klog.Infof("Fdbs: %v", data.Fdbs) + klog.Infof("Iptables: %v", data.Iptables) + klog.Infof("Routes: %v", data.Routes) + klog.Infof("XfrmPolicys: %v", data.XfrmPolicies) + klog.Infof("XfrmStates: %v", data.XfrmStates) + klog.Infof("IPsetsAvoidMasqs: %v", data.IPsetsAvoidMasqs) } func (e *NetworkManager) UpdateSync() NodeConfigSyncStatus { diff --git a/pkg/clusterlink/constants.go b/pkg/clusterlink/constants.go new file mode 100644 index 000000000..a68419ef0 --- /dev/null +++ b/pkg/clusterlink/constants.go @@ -0,0 +1,16 @@ +package clusterlink + +const ( + FLANNEL = "flannel" + TECENT_GLOBALROUTER = "globalrouter" +) + +var ( + NonMasqCNI = map[string]struct{}{ + FLANNEL: {}, + TECENT_GLOBALROUTER: {}, + } + NonMasqCNISlice = []string{ + FLANNEL, TECENT_GLOBALROUTER, + } +) diff --git a/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go b/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go index ba409c794..72f40c749 100644 --- a/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go +++ b/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go @@ -339,7 +339,7 @@ func (c *Controller) Reconcile(key utils.QueueKey) error { } klog.Infof("start reconcile cluster %s", cluster.Name) - if cluster.Spec.ClusterLinkOptions.CNI != utils.CNITypeCalico { + if cluster.Name == c.clusterName && cluster.Spec.ClusterLinkOptions.CNI != utils.CNITypeCalico { klog.Infof("cluster %s cni type is %s skip reconcile", cluster.Name, cluster.Spec.ClusterLinkOptions.CNI) return nil } diff --git a/pkg/clusterlink/controllers/cluster/cluster_controller.go b/pkg/clusterlink/controllers/cluster/cluster_controller.go index c3353484f..0a0db6c37 100644 --- a/pkg/clusterlink/controllers/cluster/cluster_controller.go +++ b/pkg/clusterlink/controllers/cluster/cluster_controller.go @@ -3,6 +3,7 @@ package cluster import ( "context" "fmt" + "net" "reflect" "strings" "time" @@ -32,6 +33,8 @@ import ( clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" + "github.com/kosmos.io/kosmos/pkg/kosmosctl/manifest" + "github.com/kosmos.io/kosmos/pkg/kosmosctl/util" "github.com/kosmos.io/kosmos/pkg/utils" "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/keys" @@ -41,10 +44,36 @@ import ( const ( FlannelCNI = "flannel" CiliumCNI = "cilium" + GlobalRouterCNI = "globalrouter" KubeFlannelConfigMap = "kube-flannel-cfg" KubeCiliumConfigMap = "cilium-config" KubeFlannelNetworkConf = "net-conf.json" KubeFlannelIPPool = "Network" + KubeSystemNamespace = "kube-system" + InvalidService = ` +apiVersion: v1 +kind: Service +metadata: + labels: + kosmos.io/app: coredns + name: invalidsvc + namespace: {{ .Namespace }} +spec: + clusterIP: 8.8.8.8 + clusterIPs: + - 8.8.8.8 + ipFamilies: + - IPv4 + ports: + - name: dns + port: 53 + protocol: UDP + targetPort: 53 + selector: + invalid/app: null + sessionAffinity: None + type: ClusterIP +` ) type SetClusterPodCIDRFun func(cluster *clusterlinkv1alpha1.Cluster) error @@ -97,15 +126,24 @@ func (c *Controller) Start(ctx context.Context) error { factory := informers.NewSharedInformerFactory(c.kubeClient, 0) informer := factory.Core().V1().Pods().Informer() c.podLister = factory.Core().V1().Pods().Lister() - podFilterFunc := func(pod *corev1.Pod) bool { - return pod.Labels["component"] == "kube-apiserver" - } cluster, err := c.clusterLinkClient.KosmosV1alpha1().Clusters().Get(ctx, c.clusterName, metav1.GetOptions{}) if err != nil { klog.Errorf("can not find local cluster %s, err: %v", c.clusterName, err) return err } + var podFilterFunc func(pod *corev1.Pod) bool + if cluster.Spec.ClusterLinkOptions.UseExternalApiserver { + podFilterFunc = func(pod *corev1.Pod) bool { + // some k8s, apiserver not a pod in cluster, maybe not a good way + return pod.Labels["k8s-app"] == "kube-proxy" || pod.Labels["app"] == "clusterlink-controller-manager" + } + } else { + podFilterFunc = func(pod *corev1.Pod) bool { + //TODO 确认这个写法是否正确 + return pod.Labels["component"] == "kube-apiserver" + } + } _, err = informer.AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { pod := obj.(*corev1.Pod) @@ -137,6 +175,15 @@ func (c *Controller) Start(ctx context.Context) error { klog.Errorf("cluster %s initCalicoInformer err: %v", err) return err } + } else if cluster.Spec.ClusterLinkOptions.CNI == GlobalRouterCNI { + c.setClusterPodCIDRFun = func(cluster *clusterlinkv1alpha1.Cluster) error { + if len(cluster.Spec.ClusterLinkOptions.ClusterPodCIDRs) == 0 { + klog.Errorf("Please define ClusterPodCIDRs for cni %s", GlobalRouterCNI) + return fmt.Errorf("clusterpodcidrs is not defined for cni %s", GlobalRouterCNI) + } + cluster.Status.ClusterLinkStatus.PodCIDRs = cluster.Spec.ClusterLinkOptions.ClusterPodCIDRs + return nil + } } else { isEtcd := CheckIsEtcd(cluster) if !isEtcd { @@ -162,12 +209,45 @@ func (c *Controller) Start(ctx context.Context) error { return nil } +func (c *Controller) GetSvcByCreateInvalidSvc() (*net.IPNet, error) { + // Aliyun ACK don't put apiserver as a pod in cluster, try to resolve svccidr from error message + // For reference: https://stackoverflow.com/questions/44190607/how-do-you-find-the-cluster-service-cidr-of-a-kubernetes-cluster + svc, err := util.GenerateService(InvalidService, manifest.ServiceReplace{ + Namespace: KubeSystemNamespace, + }) + if err != nil { + return nil, err + } + _, err = c.kubeClient.CoreV1().Services(KubeSystemNamespace).Create(context.Background(), svc, metav1.CreateOptions{}) + klog.Infof("Try creating invalid svc to get svccidr info ") + if err == nil { + err = c.kubeClient.CoreV1().Services(KubeSystemNamespace).Delete(context.Background(), svc.Name, metav1.DeleteOptions{}) + if err != nil { + return nil, fmt.Errorf("Strange create invalid svc succcessfully,but delete failed,error : %v", err) + } + return nil, fmt.Errorf("Strange create invalid svc succcessfully.") + } + + klog.Infof("Created invalid svc, the error is : %v ", err) + i := strings.Index(err.Error(), "The range of valid IPs is") + if i == -1 { + return nil, fmt.Errorf("can't find valid service cidr in error message") + } + s := strings.TrimSpace(err.Error()[i+len("The range of valid IPs is"):]) + _, svccidr, err := net.ParseCIDR(s) + if err != nil { + return nil, fmt.Errorf("can't find valid service cidr in error message, cidr str is %s", s) + } + return svccidr, nil +} + func (c *Controller) Reconcile(key utils.QueueKey) error { clusterWideKey, ok := key.(keys.ClusterWideKey) if !ok { klog.Error("invalid key") return fmt.Errorf("invalid key") } + klog.Info("cluster controller start reconcile") namespacedName := types.NamespacedName{ Name: clusterWideKey.Name, Namespace: clusterWideKey.Namespace, @@ -200,7 +280,12 @@ func (c *Controller) Reconcile(key utils.QueueKey) error { } if len(serviceCIDRS) == 0 { klog.Errorf("resolve serviceCIDRS for cluster %s failure", c.clusterName) - return err + svccidr, err := c.GetSvcByCreateInvalidSvc() + if err != nil { + klog.Errorf("get svc by creating invalid svc error: %v", err) + return err + } + serviceCIDRS = append(serviceCIDRS, svccidr.String()) } // sync pod cidr err = c.setClusterPodCIDRFun(reconcileCluster) diff --git a/pkg/clusterlink/controllers/node/node_controller.go b/pkg/clusterlink/controllers/node/node_controller.go index 68a230aed..59d2631f4 100644 --- a/pkg/clusterlink/controllers/node/node_controller.go +++ b/pkg/clusterlink/controllers/node/node_controller.go @@ -107,11 +107,34 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( }, }, } + cluster, err := r.ClusterLinkClient.KosmosV1alpha1().Clusters().Get(ctx, r.ClusterName, metav1.GetOptions{}) + if err != nil { + klog.Errorf("get cluster %s err: %v", r.ClusterName, err) + return reconcile.Result{Requeue: true}, nil + } - err := CreateOrUpdateClusterNode(r.ClusterLinkClient, clusterNode, func(n *clusterlinkv1alpha1.ClusterNode) error { + var elasticIP string + elasticIPMap := cluster.Spec.ClusterLinkOptions.NodeElasticIPMap + if len(elasticIPMap) != 0 { + if elasticIPtoParse, ok := elasticIPMap[node.Name]; ok { + _, proto := ParseIP(elasticIPtoParse) + // Now elasticIP only support IPv4 + if proto == 4 { + elasticIP = elasticIPtoParse + } + } + } + err = CreateOrUpdateClusterNode(r.ClusterLinkClient, clusterNode, func(n *clusterlinkv1alpha1.ClusterNode) error { n.Spec.NodeName = node.Name n.Spec.ClusterName = r.ClusterName // n.Spec.InterfaceName while set by clusterlink-agent + n.Spec.ElasticIP = elasticIP + + if utils.NodeReady(&node) { + n.Status.NodeStatus = string(corev1.NodeReady) + } else { + n.Status.NodeStatus = "NotReady" + } return nil }) if err != nil { diff --git a/pkg/clusterlink/elector/elector.go b/pkg/clusterlink/elector/elector.go index 4d1004496..08b88bd54 100644 --- a/pkg/clusterlink/elector/elector.go +++ b/pkg/clusterlink/elector/elector.go @@ -2,12 +2,16 @@ package elector import ( "context" + "net" "os" + "sort" + apicorev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/node" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/utils" "github.com/kosmos.io/kosmos/pkg/utils/role" @@ -32,12 +36,65 @@ func (e *Elector) EnsureGateWayRole() error { if err != nil { return err } - _, err = e.controlPanelClient.KosmosV1alpha1().Clusters().Get(context.TODO(), e.clusterName, metav1.GetOptions{}) + cluster, err := e.controlPanelClient.KosmosV1alpha1().Clusters().Get(context.TODO(), e.clusterName, metav1.GetOptions{}) if err != nil { return err } + if len(cluster.Spec.ClusterLinkOptions.NodeElasticIPMap) > 0 { + // TODO: it's not a good way, there is problem when one cluster has EIP and other clusters do not have, + var readyNodes = make([]string, 0, 5) + currentNodeName := os.Getenv(utils.EnvNodeName) + elasticIPMap := cluster.Spec.ClusterLinkOptions.NodeElasticIPMap + isCurrentNodeWithEIP := false + needReelect := true + + for nodeName := range elasticIPMap { + if nodeName == currentNodeName { + isCurrentNodeWithEIP = true + break + } + } + // check all node's elasticIP is valid + for nodeName := range elasticIPMap { + if net.ParseIP(elasticIPMap[nodeName]) == nil { + klog.Errorf("elasticIP %s is invalid", elasticIPMap[nodeName]) + continue + } + clusternode, err := e.controlPanelClient.KosmosV1alpha1().ClusterNodes().Get(context.TODO(), + node.ClusterNodeName(e.clusterName, nodeName), metav1.GetOptions{}) + if err != nil { + klog.Errorf("node %s is invalid: %v", nodeName, err) + continue + } + if len(clusternode.Status.NodeStatus) > 0 && + clusternode.Status.NodeStatus == string(apicorev1.NodeReady) { + // if some node with elasticIP is valid, don't need reelect + if clusternode.IsGateway() { + needReelect = false + e.nodeName = clusternode.Spec.NodeName + break + } + // put ready nodes with elasticIP into readyNodes slice + readyNodes = append(readyNodes, nodeName) + } + } + + if needReelect { + if !isCurrentNodeWithEIP && len(readyNodes) > 0 { + sort.Strings(readyNodes) + e.nodeName = readyNodes[0] + } else { + e.nodeName = os.Getenv(utils.EnvNodeName) + } + } + } else { + e.nodeName = os.Getenv(utils.EnvNodeName) + } + modifyNodes := e.genModifyNode(clusterNodes.Items) - klog.Infof("%d node need modify", len(modifyNodes)) + if len(modifyNodes) > 0 { + klog.Infof("%d node need modify", len(modifyNodes)) + } for i := range modifyNodes { node := modifyNodes[i] _, err := e.controlPanelClient.KosmosV1alpha1().ClusterNodes().Update(context.TODO(), &node, metav1.UpdateOptions{}) @@ -56,12 +113,12 @@ func (e *Elector) genModifyNode(clusterNodes []v1alpha1.ClusterNode) []v1alpha1. clusterNode := clusterNodes[i] isGateWay := clusterNode.IsGateway() isSameCluster := clusterNode.Spec.ClusterName == e.clusterName - isCurrentNode := clusterNode.Spec.NodeName == e.nodeName + isNewGwNode := clusterNode.Spec.NodeName == e.nodeName if isSameCluster { - if !isCurrentNode && isGateWay { + if !isNewGwNode && isGateWay { role.RemoveRole(&clusterNode, v1alpha1.RoleGateway) modifyNodes = append(modifyNodes, clusterNode) - } else if isCurrentNode && !isGateWay { + } else if isNewGwNode && !isGateWay { role.AddRole(&clusterNode, v1alpha1.RoleGateway) modifyNodes = append(modifyNodes, clusterNode) } diff --git a/pkg/clusterlink/network-manager/handlers/cni_support.go b/pkg/clusterlink/network-manager/handlers/cni_support.go new file mode 100644 index 000000000..4f7a8a2d8 --- /dev/null +++ b/pkg/clusterlink/network-manager/handlers/cni_support.go @@ -0,0 +1,38 @@ +package handlers + +import ( + "k8s.io/klog" + + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/clusterlink" + "github.com/kosmos.io/kosmos/pkg/clusterlink/network" +) + +type CNISupport struct { + Next +} + +func (h *CNISupport) Do(c *Context) (err error) { + nonMasqClusters, otherClusters := c.Filter.GetClusterByCNI(clusterlink.NonMasqCNISlice) + allClustes := append(nonMasqClusters, otherClusters...) + for _, nonMasqCluster := range nonMasqClusters { + var targetIPset []v1alpha1.IPset + for _, otherClusters := range allClustes { + if otherClusters.Name != nonMasqCluster.Name { + for _, cidr := range otherClusters.Status.ClusterLinkStatus.PodCIDRs { + targetIPset = append(targetIPset, v1alpha1.IPset{ + Name: network.KosmosIPsetVoidMasq, + CIDR: cidr, + }) + } + } + } + klog.Infof("flannel cluster name: %s, ipset: %v", nonMasqCluster.Name, targetIPset) + targetNodes := c.Filter.GetAllNodesByClusterName(nonMasqCluster.Name) + for _, node := range targetNodes { + c.Results[node.Name].IPsetsAvoidMasq = targetIPset + } + } + + return nil +} diff --git a/pkg/clusterlink/network-manager/handlers/nodeconfig.go b/pkg/clusterlink/network-manager/handlers/nodeconfig.go index e352c6671..6eb57bf8f 100644 --- a/pkg/clusterlink/network-manager/handlers/nodeconfig.go +++ b/pkg/clusterlink/network-manager/handlers/nodeconfig.go @@ -12,11 +12,14 @@ import ( // NodeConfig network configuration of the node type NodeConfig struct { - Devices []v1alpha1.Device `json:"devices,omitempty"` - Routes []v1alpha1.Route `json:"routes,omitempty"` - Iptables []v1alpha1.Iptables `json:"iptables,omitempty"` - Fdbs []v1alpha1.Fdb `json:"fdbs,omitempty"` - Arps []v1alpha1.Arp `json:"arps,omitempty"` + Devices []v1alpha1.Device `json:"devices,omitempty"` + Routes []v1alpha1.Route `json:"routes,omitempty"` + Iptables []v1alpha1.Iptables `json:"iptables,omitempty"` + Fdbs []v1alpha1.Fdb `json:"fdbs,omitempty"` + Arps []v1alpha1.Arp `json:"arps,omitempty"` + XfrmPolicies []v1alpha1.XfrmPolicy `json:"xfrmpolicies,omitempty"` + XfrmStates []v1alpha1.XfrmState `json:"xfrmstates,omitempty"` + IPsetsAvoidMasq []v1alpha1.IPset `json:"ipsetsavoidmasq,omitempty"` } func (c *NodeConfig) ToString() string { @@ -33,11 +36,14 @@ func (c *NodeConfig) ToJson() ([]byte, error) { func (c *NodeConfig) ConvertToNodeConfigSpec() v1alpha1.NodeConfigSpec { return v1alpha1.NodeConfigSpec{ - Devices: c.Devices, - Routes: c.Routes, - Iptables: c.Iptables, - Fdbs: c.Fdbs, - Arps: c.Arps, + Devices: c.Devices, + Routes: c.Routes, + Iptables: c.Iptables, + Fdbs: c.Fdbs, + Arps: c.Arps, + XfrmStates: c.XfrmStates, + XfrmPolicies: c.XfrmPolicies, + IPsetsAvoidMasqs: c.IPsetsAvoidMasq, } } @@ -47,4 +53,7 @@ func (c *NodeConfig) Sort() { sort.Sort(helpers.IptablesSorter(c.Iptables)) sort.Sort(helpers.ArpSorter(c.Arps)) sort.Sort(helpers.RouteSorter(c.Routes)) + sort.Sort(helpers.XfrmPolicySorter(c.XfrmPolicies)) + sort.Sort(helpers.XfrmStateSorter(c.XfrmStates)) + sort.Sort(helpers.IPSetSorter(c.IPsetsAvoidMasq)) } diff --git a/pkg/clusterlink/network-manager/handlers/pod_routes.go b/pkg/clusterlink/network-manager/handlers/pod_routes.go index 2d208f6df..e5d29e746 100644 --- a/pkg/clusterlink/network-manager/handlers/pod_routes.go +++ b/pkg/clusterlink/network-manager/handlers/pod_routes.go @@ -1,7 +1,13 @@ package handlers import ( + "bytes" + "crypto/md5" //nolint:gosec + "encoding/hex" + "fmt" + "hash/crc32" "net" + "os" "k8s.io/klog/v2" @@ -94,6 +100,78 @@ func SupportIPType(cluster *v1alpha1.Cluster, ipType helpers.IPType) bool { return specifiedIPType == ipType } +func addIpsecRules(ctx *Context, target *v1alpha1.ClusterNode, n *v1alpha1.ClusterNode, cidr string) { + nCluster := ctx.Filter.GetClusterByName(n.Spec.ClusterName) + var nPodCIDRs []string + if nCluster.IsP2P() { + nPodCIDRs = n.Spec.PodCIDRs + } else { + nPodCIDRs = nCluster.Status.ClusterLinkStatus.PodCIDRs + } + nPodCIDRs = FilterByIPFamily(nPodCIDRs, nCluster.Spec.ClusterLinkOptions.IPFamily) + nPodCIDRs = ConvertToGlobalCIDRs(nPodCIDRs, nCluster.Spec.ClusterLinkOptions.GlobalCIDRsMap) + var bt bytes.Buffer + if n.Name > target.Name { + bt.WriteString(n.Name) + bt.WriteString(target.Name) + } else { + bt.WriteString(target.Name) + bt.WriteString(n.Name) + } + spi := crc32.ChecksumIEEE(bt.Bytes()) + + psk_pre := md5.Sum([]byte(os.Getenv("PSK_PRE_STR"))) //nolint:gosec + psk_suffix := fmt.Sprintf("%08x", spi) + psk_suffix_byte, _ := hex.DecodeString(psk_suffix) + psk_byte := append(psk_pre[:], psk_suffix_byte...) + psk := hex.EncodeToString(psk_byte) + klog.Infof("psk_suffix: %s,psk: %s", psk_suffix, psk) + + ctx.Results[n.Name].XfrmStates = append(ctx.Results[n.Name].XfrmStates, v1alpha1.XfrmState{ + LeftIP: n.Spec.IP, + RightIP: target.Spec.ElasticIP, + ReqID: v1alpha1.DefaultReqID, + PSK: psk, + SPI: spi, + }) + ctx.Results[n.Name].XfrmStates = append(ctx.Results[n.Name].XfrmStates, v1alpha1.XfrmState{ + RightIP: n.Spec.IP, + LeftIP: target.Spec.ElasticIP, + ReqID: v1alpha1.DefaultReqID, + PSK: psk, + SPI: spi, + }) + for _, ncidr := range nPodCIDRs { + // dir : out + ctx.Results[n.Name].XfrmPolicies = append(ctx.Results[n.Name].XfrmPolicies, v1alpha1.XfrmPolicy{ + LeftIP: n.Spec.IP, + LeftNet: ncidr, + RightIP: target.Spec.ElasticIP, + RightNet: cidr, + ReqID: v1alpha1.DefaultReqID, + Dir: int(v1alpha1.IPSECOut), + }) + // dir : in + ctx.Results[n.Name].XfrmPolicies = append(ctx.Results[n.Name].XfrmPolicies, v1alpha1.XfrmPolicy{ + LeftIP: target.Spec.ElasticIP, + LeftNet: cidr, + RightIP: n.Spec.IP, + RightNet: ncidr, + ReqID: v1alpha1.DefaultReqID, + Dir: int(v1alpha1.IPSECIn), + }) + // dir : fwd + ctx.Results[n.Name].XfrmPolicies = append(ctx.Results[n.Name].XfrmPolicies, v1alpha1.XfrmPolicy{ + LeftIP: target.Spec.ElasticIP, + LeftNet: cidr, + RightIP: n.Spec.IP, + RightNet: ncidr, + ReqID: v1alpha1.DefaultReqID, + Dir: int(v1alpha1.IPSECFwd), + }) + } +} + func BuildRoutes(ctx *Context, target *v1alpha1.ClusterNode, cidrs []string) { otherClusterNodes := ctx.Filter.GetAllNodesExceptCluster(target.Spec.ClusterName) @@ -134,11 +212,16 @@ func BuildRoutes(ctx *Context, target *v1alpha1.ClusterNode, cidrs []string) { } if n.IsGateway() || srcCluster.IsP2P() { - ctx.Results[n.Name].Routes = append(ctx.Results[n.Name].Routes, v1alpha1.Route{ - CIDR: cidr, - Gw: targetIP.String(), - Dev: vxBridge, - }) + klog.Infof("Chekc node %s is gateway,t ElasticIP:%s,n ElasticIP: %s", n.Spec.NodeName, target.Spec.ElasticIP, n.Spec.ElasticIP) + if len(target.Spec.ElasticIP) > 0 && len(n.Spec.ElasticIP) > 0 { + addIpsecRules(ctx, target, n, cidr) + } else { + ctx.Results[n.Name].Routes = append(ctx.Results[n.Name].Routes, v1alpha1.Route{ + CIDR: cidr, + Gw: targetIP.String(), + Dev: vxBridge, + }) + } continue } diff --git a/pkg/clusterlink/network-manager/helpers/filter.go b/pkg/clusterlink/network-manager/helpers/filter.go index 67b8761da..6191e8f6d 100644 --- a/pkg/clusterlink/network-manager/helpers/filter.go +++ b/pkg/clusterlink/network-manager/helpers/filter.go @@ -2,6 +2,7 @@ package helpers import ( "reflect" + "sort" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" ) @@ -48,6 +49,23 @@ func (f *Filter) GetInternalNodes() []*v1alpha1.ClusterNode { return results } +func (f *Filter) GetClusterByCNI(cni []string) ([]*v1alpha1.Cluster, []*v1alpha1.Cluster) { + ss := sort.StringSlice(cni) + ss.Sort() + sc := []string(ss) + var targetCluster []*v1alpha1.Cluster + var otherCluster []*v1alpha1.Cluster + for _, cluster := range f.clusters { + pos := sort.SearchStrings(sc, cluster.Spec.ClusterLinkOptions.CNI) + if pos != len(sc) && sc[pos] == cluster.Spec.ClusterLinkOptions.CNI { + targetCluster = append(targetCluster, cluster) + } else { + otherCluster = append(otherCluster, cluster) + } + } + return targetCluster, otherCluster +} + func (f *Filter) GetEndpointNodes() []*v1alpha1.ClusterNode { var results []*v1alpha1.ClusterNode for _, node := range f.clusterNodes { diff --git a/pkg/clusterlink/network-manager/helpers/sort.go b/pkg/clusterlink/network-manager/helpers/sort.go index 4dffc0329..7fb87bc8c 100644 --- a/pkg/clusterlink/network-manager/helpers/sort.go +++ b/pkg/clusterlink/network-manager/helpers/sort.go @@ -90,3 +90,54 @@ func (s FdbSorter) Less(i, j int) bool { } return string(strI) > string(strJ) } + +// XfrmPolicySorter sorts xfrm policy. +type XfrmPolicySorter []v1alpha1.XfrmPolicy + +func (s XfrmPolicySorter) Len() int { return len(s) } +func (s XfrmPolicySorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s XfrmPolicySorter) Less(i, j int) bool { + strI, err := json.Marshal(s[i]) + if err != nil { + return i < j + } + strJ, err := json.Marshal(s[j]) + if err != nil { + return i < j + } + return string(strI) > string(strJ) +} + +// XfrmStateorter sorts xfrm policy. +type XfrmStateSorter []v1alpha1.XfrmState + +func (s XfrmStateSorter) Len() int { return len(s) } +func (s XfrmStateSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s XfrmStateSorter) Less(i, j int) bool { + strI, err := json.Marshal(s[i]) + if err != nil { + return i < j + } + strJ, err := json.Marshal(s[j]) + if err != nil { + return i < j + } + return string(strI) > string(strJ) +} + +// IPsetsorter sorts xfrm policy. +type IPSetSorter []v1alpha1.IPset + +func (s IPSetSorter) Len() int { return len(s) } +func (s IPSetSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s IPSetSorter) Less(i, j int) bool { + strI, err := json.Marshal(s[i]) + if err != nil { + return i < j + } + strJ, err := json.Marshal(s[j]) + if err != nil { + return i < j + } + return string(strI) > string(strJ) +} diff --git a/pkg/clusterlink/network-manager/network_manager.go b/pkg/clusterlink/network-manager/network_manager.go index e2d8a9064..6b4775611 100644 --- a/pkg/clusterlink/network-manager/network_manager.go +++ b/pkg/clusterlink/network-manager/network_manager.go @@ -83,7 +83,8 @@ func (n *Manager) CalculateNetworkConfigs(clusters []v1alpha1.Cluster, clusterNo SetNext(&handlers.VxLocalMacCache{}). SetNext(&handlers.VxBridgeMacCache{}). SetNext(&handlers.HostNetwork{}). - SetNext(&handlers.GlobalMap{}) + SetNext(&handlers.GlobalMap{}). + SetNext(&handlers.CNISupport{}) if err := rootHandler.Run(c); err != nil { return nil, fmt.Errorf("filed to calculate network config, err: %v", err) diff --git a/pkg/clusterlink/network/adapter.go b/pkg/clusterlink/network/adapter.go index c8ce08b80..921b18403 100644 --- a/pkg/clusterlink/network/adapter.go +++ b/pkg/clusterlink/network/adapter.go @@ -2,10 +2,13 @@ package network import ( "fmt" + "net" "github.com/pkg/errors" + "github.com/vishvananda/netlink" "k8s.io/klog/v2" + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" ) @@ -51,6 +54,27 @@ func (n *DefaultNetWork) LoadSysConfig() (*clusterlinkv1alpha1.NodeConfigSpec, e nodeConfigSpec.Arps = arps } + xfrmpolicies, err := loadXfrmPolicy() + if err != nil { + errs = errors.Wrap(err, fmt.Sprint(errs)) + } else { + nodeConfigSpec.XfrmPolicies = xfrmpolicies + } + + xfrmstates, err := loadXfrmState() + if err != nil { + errs = errors.Wrap(err, fmt.Sprint(errs)) + } else { + nodeConfigSpec.XfrmStates = xfrmstates + } + + ipsets, err := loadIPsetAvoidMasq() + if err != nil { + errs = errors.Wrap(err, fmt.Sprint(errs)) + } else { + nodeConfigSpec.IPsetsAvoidMasqs = ipsets + } + return nodeConfigSpec, errs } @@ -104,6 +128,17 @@ func (n *DefaultNetWork) DeleteDevices(devices []clusterlinkv1alpha1.Device) err return errs } +func (n *DefaultNetWork) DeleteIPsetsAvoidMasq(ipsets []clusterlinkv1alpha1.IPset) error { + var errs error + for _, ipset := range ipsets { + err := deleteIPset(ipset) + if err != nil { + errs = errors.Wrap(err, fmt.Sprintf("add ipset avoid masq error: %v", ipset)) + } + } + return errs +} + func (n *DefaultNetWork) UpdateArps([]clusterlinkv1alpha1.Arp) error { return ErrNotImplemented } @@ -124,6 +159,18 @@ func (n *DefaultNetWork) UpdateDevices([]clusterlinkv1alpha1.Device) error { return ErrNotImplemented } +func (n *DefaultNetWork) UpdateXfrmPolicies([]clusterlinkv1alpha1.XfrmPolicy) error { + return ErrNotImplemented +} + +func (n *DefaultNetWork) UpdateXfrmStates([]clusterlinkv1alpha1.XfrmState) error { + return ErrNotImplemented +} + +func (n *DefaultNetWork) UpdateIPsetsAvoidMasq([]clusterlinkv1alpha1.IPset) error { + return ErrNotImplemented +} + func (n *DefaultNetWork) AddArps(arps []clusterlinkv1alpha1.Arp) error { var errs error for _, arp := range arps { @@ -164,6 +211,117 @@ func (n *DefaultNetWork) AddRoutes(routes []clusterlinkv1alpha1.Route) error { return errs } +// For reference: +// https://github.com/flannel-io/flannel +func (n *DefaultNetWork) AddXfrmPolicies(xfrmpolicies []clusterlinkv1alpha1.XfrmPolicy) error { + var errs error + for _, xfrmpolicy := range xfrmpolicies { + srcIP := net.ParseIP(xfrmpolicy.LeftIP) + dstIP := net.ParseIP(xfrmpolicy.RightIP) + _, srcNet, _ := net.ParseCIDR(xfrmpolicy.LeftNet) + _, dstNet, _ := net.ParseCIDR(xfrmpolicy.RightNet) + reqID := xfrmpolicy.ReqID + + var err error + var xfrmpolicydir netlink.Dir + switch v1alpha1.IPSECDirection(xfrmpolicy.Dir) { + case v1alpha1.IPSECOut: + xfrmpolicydir = netlink.XFRM_DIR_OUT + case v1alpha1.IPSECIn: + xfrmpolicydir = netlink.XFRM_DIR_IN + case v1alpha1.IPSECFwd: + xfrmpolicydir = netlink.XFRM_DIR_FWD + } + err = AddXFRMPolicy(srcNet, dstNet, srcIP, dstIP, xfrmpolicydir, reqID) + if err != nil { + errs = errors.Wrap(err, fmt.Sprintf("error adding ipsec policy: %v", xfrmpolicy)) + } + } + return errs +} + +// For reference: +// https://github.com/flannel-io/flannel +func (n *DefaultNetWork) DeleteXfrmPolicies(xfrmpolicies []clusterlinkv1alpha1.XfrmPolicy) error { + var errs error + for _, xfrmpolicy := range xfrmpolicies { + srcIP := net.ParseIP(xfrmpolicy.LeftIP) + dstIP := net.ParseIP(xfrmpolicy.RightIP) + _, srcNet, _ := net.ParseCIDR(xfrmpolicy.LeftNet) + _, dstNet, _ := net.ParseCIDR(xfrmpolicy.RightNet) + reqID := xfrmpolicy.ReqID + + var xfrmpolicydir netlink.Dir + switch v1alpha1.IPSECDirection(xfrmpolicy.Dir) { + case v1alpha1.IPSECOut: + xfrmpolicydir = netlink.XFRM_DIR_OUT + case v1alpha1.IPSECIn: + xfrmpolicydir = netlink.XFRM_DIR_IN + case v1alpha1.IPSECFwd: + xfrmpolicydir = netlink.XFRM_DIR_FWD + } + + if reqID != v1alpha1.DefaultReqID { + klog.Info("Xfrm policy %v reqID is %d, not created by kosmos", xfrmpolicy, reqID) + continue + } + err := DeleteXFRMPolicy(srcNet, dstNet, srcIP, dstIP, xfrmpolicydir, reqID) + if err != nil { + errs = errors.Wrap(err, fmt.Sprintf("error deleting ipsec policy: %v", xfrmpolicy)) + } + } + return errs +} + +func (n *DefaultNetWork) AddXfrmStates(xfrmstates []clusterlinkv1alpha1.XfrmState) error { + var errs error + for _, xfrmstate := range xfrmstates { + srcIP := net.ParseIP(xfrmstate.LeftIP) + dstIP := net.ParseIP(xfrmstate.RightIP) + reqID := xfrmstate.ReqID + err := AddXFRMState(srcIP, dstIP, reqID, int(xfrmstate.SPI), xfrmstate.PSK) + if err != nil { + errs = errors.Wrap(err, fmt.Sprintf("error adding ipsec state: %v", xfrmstate)) + } + } + return errs +} + +func (n *DefaultNetWork) DeleteXfrmStates(xfrmstates []clusterlinkv1alpha1.XfrmState) error { + var errs error + for _, xfrmstate := range xfrmstates { + srcIP := net.ParseIP(xfrmstate.LeftIP) + dstIP := net.ParseIP(xfrmstate.RightIP) + reqID := xfrmstate.ReqID + if reqID != v1alpha1.DefaultReqID { + klog.Info("Xfrm state %v reqID is %d, not created by kosmos", xfrmstate, reqID) + continue + } + err := DeleteXFRMState(srcIP, dstIP, reqID, int(xfrmstate.SPI), xfrmstate.PSK) + if err != nil { + errs = errors.Wrap(err, fmt.Sprintf("error deleting ipsec state: %v", xfrmstate)) + } + } + return errs +} + +func (n *DefaultNetWork) AddIPsetsAvoidMasq(ipsets []clusterlinkv1alpha1.IPset) error { + var errs error + for _, ipset := range ipsets { + err := addIPset(ipset) + if err != nil { + errs = errors.Wrap(err, fmt.Sprintf("add ipset avoid masq error: %v", ipset)) + } + } + if len(ipsets) > 0 { + err := ensureAvoidMasqRule() + if err != nil { + errs = errors.Wrap(err, "create iptables rule to avoid masq,error") + } + } + return errs +} + func (n *DefaultNetWork) AddDevices(devices []clusterlinkv1alpha1.Device) error { var errs error for _, device := range devices { diff --git a/pkg/clusterlink/network/avoidmasq.go b/pkg/clusterlink/network/avoidmasq.go new file mode 100644 index 000000000..7e4b5df34 --- /dev/null +++ b/pkg/clusterlink/network/avoidmasq.go @@ -0,0 +1,122 @@ +package network + +import ( + "fmt" + "math/bits" + "net" + "syscall" + + ipt "github.com/coreos/go-iptables/iptables" + "github.com/pkg/errors" + "github.com/vishvananda/netlink" + "k8s.io/kubernetes/pkg/util/ipset" + proxyipset "k8s.io/kubernetes/pkg/util/ipset" + utilexec "k8s.io/utils/exec" + + clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + nmhelpers "github.com/kosmos.io/kosmos/pkg/clusterlink/network-manager/helpers" + "github.com/kosmos.io/kosmos/pkg/clusterlink/network/iptables" +) + +const ( + KosmosIPsetVoidMasq = "kosmosipset" +) + +func CaculateMaskSize(ipnet net.IPNet) uint8 { + var maskSize uint8 = 0 + for _, maskbyte := range []byte(ipnet.Mask) { + maskSize += uint8(bits.OnesCount8(maskbyte)) + } + return maskSize +} + +func ensureAvoidMasqRule() error { + iptableHandler, err := iptables.New(ipt.ProtocolIPv4) + if err != nil { + return err + } + + //ruleSpec := []string{"-m", "set", "--match-set", KosmosIPsetVoidMasq, "src", + // "-m", "set", "!", "--match-set", KosmosIPsetVoidMasq, "dst", "-j", "RETURN"} + ruleSpec := []string{"-m", "set", "--match-set", KosmosIPsetVoidMasq, "dst", "-j", "RETURN"} + + if err = iptableHandler.InsertUnique("nat", "POSTROUTING", 2, ruleSpec); err != nil { + return errors.Wrap(err, "unable to insert iptable rule in nat table to avoid masq") + } + return nil +} + +func addIPset(ipsetcidr clusterlinkv1alpha1.IPset) error { + if nmhelpers.GetIPType(ipsetcidr.CIDR) != nmhelpers.IPV4 { + return fmt.Errorf("only support ipv4,can't avoid cidr %s masq", ipsetcidr.CIDR) + } + + ipsetInterface := proxyipset.New(utilexec.New()) + + _, err := netlink.IpsetList(ipsetcidr.Name) + if err != nil { + if !errors.Is(err, syscall.ENOENT) { + return err + } + // kubeproxy ipset do not support hash:net type ipset,so use netlink + err = netlink.IpsetCreate(ipsetcidr.Name, "hash:net", netlink.IpsetCreateOptions{}) + if err != nil { + return err + } + } + ipsetToAdd := ipset.IPSet{ + Name: ipsetcidr.Name, + HashFamily: ipset.ProtocolFamilyIPV4, + SetType: ipset.Type("hash:net"), + MaxElem: 1048576, + Comment: "For kosmos", + } + //err := ipsetInterface.CreateSet(&ipsetToAdd, true) + //if err != nil { + // return err + //} + + // netlink don't support ipset protocol 7,so use kubeproxy way + err = ipsetInterface.AddEntry(ipsetcidr.CIDR, &ipsetToAdd, true) + if err != nil { + return err + } + return nil +} + +func deleteIPset(ipsetcidr clusterlinkv1alpha1.IPset) error { + ipsetInterface := proxyipset.New(utilexec.New()) + + err := ipsetInterface.DelEntry(ipsetcidr.CIDR, ipsetcidr.Name) + if err != nil { + return err + } + + return nil +} + +func loadIPsetAvoidMasq() ([]clusterlinkv1alpha1.IPset, error) { + return ListIPset([]string{KosmosIPsetVoidMasq}) +} + +func ListIPset(ipsetListNames []string) ([]clusterlinkv1alpha1.IPset, error) { + var errs error + ret := []clusterlinkv1alpha1.IPset{} + for _, ipsetName := range ipsetListNames { + ipsetRet, err := netlink.IpsetList(ipsetName) + if err != nil { + if !errors.Is(err, syscall.ENOENT) { + errs = errors.Wrap(err, fmt.Sprintf("error list ipset: %s", ipsetName)) + } + continue + } + + for _, entry := range ipsetRet.Entries { + ret = append(ret, clusterlinkv1alpha1.IPset{ + Name: ipsetName, + CIDR: fmt.Sprintf("%s/%d", entry.IP, entry.CIDR), + }) + } + } + return ret, errs +} diff --git a/pkg/clusterlink/network/interface.go b/pkg/clusterlink/network/interface.go index 58b6a43d1..a04f4eb6b 100644 --- a/pkg/clusterlink/network/interface.go +++ b/pkg/clusterlink/network/interface.go @@ -19,18 +19,27 @@ type NetWork interface { DeleteIptables([]clusterlinkv1alpha1.Iptables) error DeleteRoutes([]clusterlinkv1alpha1.Route) error DeleteDevices([]clusterlinkv1alpha1.Device) error + DeleteXfrmPolicies([]clusterlinkv1alpha1.XfrmPolicy) error + DeleteXfrmStates([]clusterlinkv1alpha1.XfrmState) error + DeleteIPsetsAvoidMasq([]clusterlinkv1alpha1.IPset) error UpdateArps([]clusterlinkv1alpha1.Arp) error UpdateFdbs([]clusterlinkv1alpha1.Fdb) error UpdateIptables([]clusterlinkv1alpha1.Iptables) error UpdateRoutes([]clusterlinkv1alpha1.Route) error UpdateDevices([]clusterlinkv1alpha1.Device) error + UpdateXfrmPolicies([]clusterlinkv1alpha1.XfrmPolicy) error + UpdateXfrmStates([]clusterlinkv1alpha1.XfrmState) error + UpdateIPsetsAvoidMasq([]clusterlinkv1alpha1.IPset) error AddArps([]clusterlinkv1alpha1.Arp) error AddFdbs([]clusterlinkv1alpha1.Fdb) error AddIptables([]clusterlinkv1alpha1.Iptables) error AddRoutes([]clusterlinkv1alpha1.Route) error AddDevices([]clusterlinkv1alpha1.Device) error + AddXfrmPolicies([]clusterlinkv1alpha1.XfrmPolicy) error + AddXfrmStates([]clusterlinkv1alpha1.XfrmState) error + AddIPsetsAvoidMasq([]clusterlinkv1alpha1.IPset) error InitSys() diff --git a/pkg/clusterlink/network/iptables/iptables.go b/pkg/clusterlink/network/iptables/iptables.go index 8ce54f6da..eb53e71ed 100644 --- a/pkg/clusterlink/network/iptables/iptables.go +++ b/pkg/clusterlink/network/iptables/iptables.go @@ -26,6 +26,8 @@ import ( "github.com/coreos/go-iptables/iptables" "github.com/pkg/errors" + "k8s.io/klog" + "k8s.io/utils/exec" ) type Basic interface { @@ -57,6 +59,35 @@ type iptablesWrapper struct { var NewFunc func() (Interface, error) +// useful for tencent TKE +func init() { + errInfo := "select iptables-nft or iptables-legacy error" + execInterface := exec.New() + ret_nft, err := execInterface.Command("iptables-nft-save").CombinedOutput() + if err != nil { + klog.Errorf("%s: %v", errInfo, err) + return + } + ret_legacy, err := execInterface.Command("iptables-legacy-save").CombinedOutput() + if err != nil { + klog.Errorf("%s: %v", errInfo, err) + return + } + if len(ret_nft) > len(ret_legacy) { + klog.Info("use iptables-nft as default iptables") + _, err := execInterface.Command("ln", []string{"-sf", "/sbin/xtables-nft-multi", "/sbin/iptables"}...).CombinedOutput() + if err != nil { + klog.Errorf("%s: %v", errInfo, err) + return + } + _, err = execInterface.Command("ln", []string{"-sf", "/sbin/xtables-nft-multi", "/sbin/ip6tables"}...).CombinedOutput() + if err != nil { + klog.Errorf("%s: %v", errInfo, err) + return + } + } +} + func New(proto iptables.Protocol) (Interface, error) { if NewFunc != nil { return NewFunc() diff --git a/pkg/clusterlink/network/xfrm_policy.go b/pkg/clusterlink/network/xfrm_policy.go new file mode 100644 index 000000000..f9d56fbcc --- /dev/null +++ b/pkg/clusterlink/network/xfrm_policy.go @@ -0,0 +1,180 @@ +package network + +import ( + "encoding/hex" + "errors" + "fmt" + "net" + "syscall" + + "github.com/vishvananda/netlink" + log "k8s.io/klog/v2" + + clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" +) + +// For reference: +// https://github.com/flannel-io/flannel +func AddXFRMPolicy(srcNet, dstNet *net.IPNet, srcIP, dstIP net.IP, dir netlink.Dir, reqID int) error { + policy := &netlink.XfrmPolicy{ + Src: srcNet, + Dst: dstNet, + Dir: dir, + } + + tmpl := netlink.XfrmPolicyTmpl{ + Src: srcIP, + Dst: dstIP, + Proto: netlink.XFRM_PROTO_ESP, + Mode: netlink.XFRM_MODE_TUNNEL, + Reqid: reqID, + } + + policy.Tmpls = append(policy.Tmpls, tmpl) + + if existingPolicy, err := netlink.XfrmPolicyGet(policy); err != nil { + if errors.Is(err, syscall.ENOENT) { + log.Infof("Adding ipsec policy: %+v", tmpl) + if err := netlink.XfrmPolicyAdd(policy); err != nil { + return fmt.Errorf("error adding policy: %+v err: %v", policy, err) + } + } else { + return fmt.Errorf("error getting policy: %+v err: %v", policy, err) + } + } else { + log.Infof("Updating ipsec policy %+v with %+v", existingPolicy, policy) + if err := netlink.XfrmPolicyUpdate(policy); err != nil { + return fmt.Errorf("error updating policy: %+v err: %v", policy, err) + } + } + return nil +} + +func DeleteXFRMPolicy(srcNet, dstNet *net.IPNet, srcIP, dstIP net.IP, dir netlink.Dir, reqID int) error { + policy := netlink.XfrmPolicy{ + Src: srcNet, + Dst: dstNet, + Dir: dir, + } + + tmpl := netlink.XfrmPolicyTmpl{ + Src: srcIP, + Dst: dstIP, + Proto: netlink.XFRM_PROTO_ESP, + Mode: netlink.XFRM_MODE_TUNNEL, + Reqid: reqID, + } + + log.Infof("Deleting ipsec policy: %+v", tmpl) + + policy.Tmpls = append(policy.Tmpls, tmpl) + + if err := netlink.XfrmPolicyDel(&policy); err != nil { + return fmt.Errorf("error deleting policy: %+v err: %v", policy, err) + } + + return nil +} + +func AddXFRMState(srcIP, dstIP net.IP, reqID int, spi int, psk string) error { + k, _ := hex.DecodeString(psk) + state := netlink.XfrmState{ + Src: srcIP, + Dst: dstIP, + Proto: netlink.XFRM_PROTO_ESP, + Mode: netlink.XFRM_MODE_TUNNEL, + Spi: spi, + Reqid: reqID, + Aead: &netlink.XfrmStateAlgo{ + Name: "rfc4106(gcm(aes))", + Key: k, + ICVLen: 128, + }, + } + + if existingState, err := netlink.XfrmStateGet(&state); err != nil { + if errors.Is(err, syscall.ESRCH) || errors.Is(err, syscall.ENOENT) { + log.Infof("Adding xfrm state: %+v", state) + if err := netlink.XfrmStateAdd(&state); err != nil { + return fmt.Errorf("error adding state: %+v err: %v", state, err) + } + } else { + return fmt.Errorf("error getting state: %+v err: %v", state, err) + } + } else { + log.Infof("Updating xfrm state %+v with %+v", existingState, state) + if err := netlink.XfrmStateUpdate(&state); err != nil { + return fmt.Errorf("error updating state: %+v err: %v", state, err) + } + } + return nil +} + +func DeleteXFRMState(srcIP, dstIP net.IP, reqID int, spi int, psk string) error { + k, _ := hex.DecodeString(psk) + state := netlink.XfrmState{ + Src: srcIP, + Dst: dstIP, + Proto: netlink.XFRM_PROTO_ESP, + Mode: netlink.XFRM_MODE_TUNNEL, + Spi: spi, + Reqid: reqID, + Aead: &netlink.XfrmStateAlgo{ + Name: "rfc4106(gcm(aes))", + Key: k, + ICVLen: 128, + }, + } + log.Infof("Deleting ipsec state: %+v", state) + err := netlink.XfrmStateDel(&state) + if err != nil { + return fmt.Errorf("error delete xfrm state: %+v err: %v", state, err) + } + return nil +} + +func ListXfrmPolicy() ([]clusterlinkv1alpha1.XfrmPolicy, error) { + xfrmpolicies, err := netlink.XfrmPolicyList(netlink.FAMILY_ALL) + if err != nil { + return nil, fmt.Errorf("error list xfrm policy: %v", err) + } + var ret []clusterlinkv1alpha1.XfrmPolicy + for _, policy := range xfrmpolicies { + ret = append(ret, clusterlinkv1alpha1.XfrmPolicy{ + LeftIP: policy.Tmpls[0].Src.String(), + LeftNet: policy.Src.String(), + RightIP: policy.Tmpls[0].Dst.String(), + RightNet: policy.Dst.String(), + ReqID: policy.Tmpls[0].Reqid, + Dir: int(policy.Dir), + }) + } + return ret, nil +} + +func ListXfrmState() ([]clusterlinkv1alpha1.XfrmState, error) { + xfrmstates, err := netlink.XfrmStateList(netlink.FAMILY_ALL) + if err != nil { + return nil, fmt.Errorf("error list xfrm state: %v", err) + } + var ret []clusterlinkv1alpha1.XfrmState + for _, state := range xfrmstates { + k := hex.EncodeToString(state.Aead.Key) + ret = append(ret, clusterlinkv1alpha1.XfrmState{ + LeftIP: state.Src.String(), + RightIP: state.Dst.String(), + ReqID: state.Reqid, + PSK: k, + SPI: uint32(state.Spi), + }) + } + return ret, nil +} + +func loadXfrmPolicy() ([]clusterlinkv1alpha1.XfrmPolicy, error) { + return ListXfrmPolicy() +} + +func loadXfrmState() ([]clusterlinkv1alpha1.XfrmState, error) { + return ListXfrmState() +} diff --git a/pkg/kosmosctl/install/install.go b/pkg/kosmosctl/install/install.go index 6c7703e3b..fdd6206d3 100644 --- a/pkg/kosmosctl/install/install.go +++ b/pkg/kosmosctl/install/install.go @@ -3,6 +3,7 @@ package install import ( "context" "fmt" + "net" "os" "path/filepath" @@ -23,6 +24,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/cert" + clustercontrollers "github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/cluster" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kosmosctl/join" "github.com/kosmos.io/kosmos/pkg/kosmosctl/manifest" @@ -56,11 +58,14 @@ type CommandInstallOptions struct { HostKubeConfigStream []byte WaitTime int - CNI string - DefaultNICName string - NetworkType string - IpFamily string - UseProxy string + CNI string + DefaultNICName string + NetworkType string + IpFamily string + UseProxy string + NodeElasticIP map[string]string + ClusterPodCIDRs []string + UseExternalApiserver bool KosmosClient versioned.Interface K8sClient kubernetes.Interface @@ -100,6 +105,9 @@ func NewCmdInstall(f ctlutil.Factory) *cobra.Command { flags.StringVar(&o.IpFamily, "ip-family", string(v1alpha1.IPFamilyTypeIPV4), "Specify the IP protocol version used by network devices, common IP families include IPv4 and IPv6.") flags.StringVar(&o.UseProxy, "use-proxy", "false", "Set whether to enable proxy.") flags.IntVarP(&o.WaitTime, "wait-time", "", utils.DefaultWaitTime, "Wait the specified time for the Kosmos install ready.") + flags.StringToStringVar(&o.NodeElasticIP, "node-elasticip", nil, "Set cluster node with elastic ip.") + flags.StringSliceVar(&o.ClusterPodCIDRs, "cluster-pod-cidrs", nil, "Set cluster pods cidrs.") + flags.BoolVar(&o.UseExternalApiserver, "use-extelnal-apiserver", true, "Apiserver is a pod in cluster or not.") flags.StringVar(&o.CertEncode, "cert-encode", cert.GetCrtEncode(), "cert base64 string for node server.") flags.StringVar(&o.KeyEncode, "key-encode", cert.GetKeyEncode(), "key base64 string for node server.") @@ -154,6 +162,29 @@ func (o *CommandInstallOptions) Validate() error { return fmt.Errorf("kosmosctl install validate error, namespace is not valid") } + validationErr := "kosmosctl install validate error" + for nodeName, elasticIP := range o.NodeElasticIP { + _, err := o.K8sClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("%s, node %s is invalid: %v", validationErr, nodeName, err) + } + if net.ParseIP(elasticIP) == nil { + return fmt.Errorf("%s, ElasticIP %s is invalid", validationErr, elasticIP) + } + } + + if o.CNI == clustercontrollers.GlobalRouterCNI { + if o.ClusterPodCIDRs == nil { + return fmt.Errorf("%s, should specify ClusterPodCIDRs when using cni globalrouter", validationErr) + } + for _, podsCidr := range o.ClusterPodCIDRs { + if _, _, err := net.ParseCIDR(podsCidr); err != nil { + return fmt.Errorf("%s, pod cidr is invalid", validationErr) + } + } + o.UseExternalApiserver = true + } + return nil } @@ -293,6 +324,7 @@ func (o *CommandInstallOptions) runClusterlink() error { Namespace: o.Namespace, ImageRepository: o.ImageRegistry, Version: version.GetReleaseVersion().PatchRelease(), + PSKPreStr: v1alpha1.DefaultPSKPreStr, }) if err != nil { return err @@ -565,21 +597,24 @@ func (o *CommandInstallOptions) createControlCluster() error { if apierrors.IsNotFound(err) { clusterArgs := []string{"cluster"} joinOptions := join.CommandJoinOptions{ - Name: utils.DefaultClusterName, - Namespace: o.Namespace, - ImageRegistry: o.ImageRegistry, - KubeConfigStream: o.HostKubeConfigStream, - WaitTime: o.WaitTime, - KosmosClient: o.KosmosClient, - K8sClient: o.K8sClient, - K8sExtensionsClient: o.K8sExtensionsClient, - RootFlag: true, - EnableLink: true, - CNI: o.CNI, - DefaultNICName: o.DefaultNICName, - NetworkType: o.NetworkType, - IpFamily: o.IpFamily, - UseProxy: o.UseProxy, + Name: utils.DefaultClusterName, + Namespace: o.Namespace, + ImageRegistry: o.ImageRegistry, + KubeConfigStream: o.HostKubeConfigStream, + WaitTime: o.WaitTime, + KosmosClient: o.KosmosClient, + K8sClient: o.K8sClient, + K8sExtensionsClient: o.K8sExtensionsClient, + RootFlag: true, + EnableLink: true, + CNI: o.CNI, + DefaultNICName: o.DefaultNICName, + NetworkType: o.NetworkType, + IpFamily: o.IpFamily, + UseProxy: o.UseProxy, + NodeElasticIP: o.NodeElasticIP, + ClusterPodCIDRs: o.ClusterPodCIDRs, + UseExternalApiserver: o.UseExternalApiserver, } err = joinOptions.Run(clusterArgs) @@ -609,6 +644,9 @@ func (o *CommandInstallOptions) createControlCluster() error { case utils.DefaultIPv6: controlCluster.Spec.ClusterLinkOptions.IPFamily = v1alpha1.IPFamilyTypeIPV6 } + controlCluster.Spec.ClusterLinkOptions.NodeElasticIPMap = o.NodeElasticIP + controlCluster.Spec.ClusterLinkOptions.ClusterPodCIDRs = o.ClusterPodCIDRs + controlCluster.Spec.ClusterLinkOptions.UseExternalApiserver = o.UseExternalApiserver _, err = o.KosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), controlCluster, metav1.UpdateOptions{}) if err != nil { klog.Infof("ControlCluster-Link: ", controlCluster) @@ -659,22 +697,25 @@ func (o *CommandInstallOptions) createControlCluster() error { if apierrors.IsNotFound(err) { clusterArgs := []string{"cluster"} joinOptions := join.CommandJoinOptions{ - Name: utils.DefaultClusterName, - Namespace: o.Namespace, - ImageRegistry: o.ImageRegistry, - KubeConfigStream: o.HostKubeConfigStream, - K8sExtensionsClient: o.K8sExtensionsClient, - WaitTime: o.WaitTime, - KosmosClient: o.KosmosClient, - K8sClient: o.K8sClient, - RootFlag: true, - EnableLink: true, - CNI: o.CNI, - DefaultNICName: o.DefaultNICName, - NetworkType: o.NetworkType, - IpFamily: o.IpFamily, - UseProxy: o.UseProxy, - EnableTree: true, + Name: utils.DefaultClusterName, + Namespace: o.Namespace, + ImageRegistry: o.ImageRegistry, + KubeConfigStream: o.HostKubeConfigStream, + K8sExtensionsClient: o.K8sExtensionsClient, + WaitTime: o.WaitTime, + KosmosClient: o.KosmosClient, + K8sClient: o.K8sClient, + RootFlag: true, + EnableLink: true, + CNI: o.CNI, + DefaultNICName: o.DefaultNICName, + NetworkType: o.NetworkType, + IpFamily: o.IpFamily, + UseProxy: o.UseProxy, + EnableTree: true, + NodeElasticIP: o.NodeElasticIP, + ClusterPodCIDRs: o.ClusterPodCIDRs, + UseExternalApiserver: o.UseExternalApiserver, } err = joinOptions.Run(clusterArgs) @@ -705,6 +746,9 @@ func (o *CommandInstallOptions) createControlCluster() error { case utils.DefaultIPv6: controlCluster.Spec.ClusterLinkOptions.IPFamily = v1alpha1.IPFamilyTypeIPV6 } + controlCluster.Spec.ClusterLinkOptions.NodeElasticIPMap = o.NodeElasticIP + controlCluster.Spec.ClusterLinkOptions.ClusterPodCIDRs = o.ClusterPodCIDRs + controlCluster.Spec.ClusterLinkOptions.UseExternalApiserver = o.UseExternalApiserver _, err = o.KosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), controlCluster, metav1.UpdateOptions{}) if err != nil { klog.Infof("ControlCluster-All: ", controlCluster) diff --git a/pkg/kosmosctl/join/join.go b/pkg/kosmosctl/join/join.go index d8018a8e4..a36881683 100644 --- a/pkg/kosmosctl/join/join.go +++ b/pkg/kosmosctl/join/join.go @@ -3,6 +3,7 @@ package join import ( "context" "fmt" + "net" "os" "path/filepath" @@ -21,6 +22,7 @@ import ( "k8s.io/kubectl/pkg/util/templates" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + clustercontrollers "github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/cluster" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kosmosctl/manifest" "github.com/kosmos.io/kosmos/pkg/kosmosctl/util" @@ -53,12 +55,15 @@ type CommandJoinOptions struct { RootFlag bool EnableAll bool - EnableLink bool - CNI string - DefaultNICName string - NetworkType string - IpFamily string - UseProxy string + EnableLink bool + CNI string + DefaultNICName string + NetworkType string + IpFamily string + UseProxy string + NodeElasticIP map[string]string + ClusterPodCIDRs []string + UseExternalApiserver bool EnableTree bool LeafModel string @@ -104,6 +109,9 @@ func NewCmdJoin(f ctlutil.Factory) *cobra.Command { flags.BoolVar(&o.EnableTree, "enable-tree", false, "Turn on clustertree.") flags.StringVar(&o.LeafModel, "leaf-model", "", "Set leaf cluster model, which supports one-to-one model.") flags.IntVarP(&o.WaitTime, "wait-time", "", utils.DefaultWaitTime, "Wait the specified time for the Kosmos install ready.") + flags.StringToStringVar(&o.NodeElasticIP, "node-elasticip", nil, "Set cluster node with elastic ip.") + flags.StringSliceVar(&o.ClusterPodCIDRs, "cluster-pod-cidrs", nil, "Set cluster pods cidrs.") + flags.BoolVar(&o.UseExternalApiserver, "use-extelnal-apiserver", true, "Apiserver is a pod in cluster or not.") return cmd } @@ -174,6 +182,17 @@ func (o *CommandJoinOptions) Validate(args []string) error { return fmt.Errorf("kosmosctl join validate error, namespace is not valid") } + validationErr := "kosmosctl join validate error" + for nodeName, elasticIP := range o.NodeElasticIP { + _, err := o.K8sClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("%s, node %s is invalid: %v", validationErr, nodeName, err) + } + if net.ParseIP(elasticIP) == nil { + return fmt.Errorf("%s, ElasticIP %s is invalid", validationErr, elasticIP) + } + } + switch args[0] { case "cluster": _, err := o.KosmosClient.KosmosV1alpha1().Clusters().Get(context.TODO(), o.Name, metav1.GetOptions{}) @@ -184,6 +203,18 @@ func (o *CommandJoinOptions) Validate(args []string) error { } } + if o.CNI == clustercontrollers.GlobalRouterCNI { + if o.ClusterPodCIDRs == nil { + return fmt.Errorf("%s, should specify ClusterPodCIDRs when using cni globalrouter", validationErr) + } + for _, podsCidr := range o.ClusterPodCIDRs { + if _, _, err := net.ParseCIDR(podsCidr); err != nil { + return fmt.Errorf("%s, pod cidr is invalid", validationErr) + } + } + o.UseExternalApiserver = true + } + return nil } @@ -225,8 +256,9 @@ func (o *CommandJoinOptions) runCluster() error { IP: "210.0.0.0/8", IP6: "9470::0/16", }, - NetworkType: v1alpha1.NetWorkTypeGateWay, - IPFamily: v1alpha1.IPFamilyTypeIPV4, + NetworkType: v1alpha1.NetWorkTypeGateWay, + IPFamily: v1alpha1.IPFamilyTypeIPV4, + NodeElasticIPMap: o.NodeElasticIP, }, ClusterTreeOptions: &v1alpha1.ClusterTreeOptions{ Enable: o.EnableTree, @@ -253,6 +285,9 @@ func (o *CommandJoinOptions) runCluster() error { cluster.Spec.ClusterLinkOptions.DefaultNICName = o.DefaultNICName cluster.Spec.ClusterLinkOptions.CNI = o.CNI + cluster.Spec.ClusterLinkOptions.NodeElasticIPMap = o.NodeElasticIP + cluster.Spec.ClusterLinkOptions.ClusterPodCIDRs = o.ClusterPodCIDRs + cluster.Spec.ClusterLinkOptions.UseExternalApiserver = o.UseExternalApiserver } if o.EnableTree { diff --git a/pkg/kosmosctl/manifest/manifest_crds.go b/pkg/kosmosctl/manifest/manifest_crds.go index 1174c1550..945602370 100644 --- a/pkg/kosmosctl/manifest/manifest_crds.go +++ b/pkg/kosmosctl/manifest/manifest_crds.go @@ -347,6 +347,8 @@ spec: properties: clusterName: type: string + elasticip: + type: string interfaceName: type: string ip: @@ -365,16 +367,17 @@ spec: type: array type: object status: + properties: + nodeStatus: + type: string type: object required: - spec type: object served: true storage: true - subresources: - status: {} + subresources: {} ` - const Cluster = `--- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition @@ -420,6 +423,8 @@ spec: properties: clusterLinkOptions: properties: + autodetectionMethod: + type: string bridgeCIDRs: default: ip: 220.0.0.0/8 @@ -433,6 +438,10 @@ spec: - ip - ip6 type: object + clusterpodCIDRs: + items: + type: string + type: array cni: default: calico type: string @@ -482,9 +491,17 @@ spec: - nodeName type: object type: array + nodeElasticIPMap: + additionalProperties: + type: string + description: NodeElasticIPMap presents mapping between nodename + in kubernetes and elasticIP + type: object useIPPool: default: false type: boolean + useexternalapiserver: + type: boolean type: object clusterTreeOptions: properties: @@ -652,7 +669,6 @@ spec: storage: true subresources: {} ` - const NodeConfig = `--- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition @@ -747,6 +763,18 @@ spec: - mac type: object type: array + ipsetsavoidmasq: + items: + properties: + cidr: + type: string + name: + type: string + required: + - cidr + - name + type: object + type: array iptables: items: properties: @@ -777,6 +805,52 @@ spec: - gw type: object type: array + xfrmpolicies: + items: + properties: + dir: + type: integer + leftip: + type: string + leftnet: + type: string + reqid: + type: integer + rightip: + type: string + rightnet: + type: string + required: + - dir + - leftip + - leftnet + - reqid + - rightip + - rightnet + type: object + type: array + xfrmstates: + items: + properties: + PSK: + type: string + leftip: + type: string + reqid: + type: integer + rightip: + type: string + spi: + format: int32 + type: integer + required: + - PSK + - leftip + - reqid + - rightip + - spi + type: object + type: array type: object status: properties: @@ -795,7 +869,6 @@ spec: subresources: status: {} ` - const DaemonSet = `--- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition diff --git a/pkg/kosmosctl/manifest/manifest_deployments.go b/pkg/kosmosctl/manifest/manifest_deployments.go index f888726b4..2d3e8fbc6 100644 --- a/pkg/kosmosctl/manifest/manifest_deployments.go +++ b/pkg/kosmosctl/manifest/manifest_deployments.go @@ -34,6 +34,9 @@ spec: requests: cpu: 500m memory: 500Mi + env: + - name: PSK_PRE_STR + value: "{{ .PSKPreStr }}" ` KosmosOperatorDeployment = ` @@ -268,5 +271,6 @@ type DeploymentReplace struct { ImageRepository string Version string - UseProxy string + UseProxy string + PSKPreStr string } diff --git a/vendor/k8s.io/kubernetes/pkg/util/ipset/OWNERS b/vendor/k8s.io/kubernetes/pkg/util/ipset/OWNERS new file mode 100644 index 000000000..c70337b9a --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/util/ipset/OWNERS @@ -0,0 +1,11 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +reviewers: + - sig-network-reviewers +approvers: + - sig-network-approvers +labels: + - sig/network +emeritus_approvers: + - brendandburns + - m1093782566 diff --git a/vendor/k8s.io/kubernetes/pkg/util/ipset/ipset.go b/vendor/k8s.io/kubernetes/pkg/util/ipset/ipset.go new file mode 100644 index 000000000..c82fe0c31 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/util/ipset/ipset.go @@ -0,0 +1,532 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipset + +import ( + "bytes" + "fmt" + "regexp" + "strconv" + "strings" + + "k8s.io/klog/v2" + utilexec "k8s.io/utils/exec" + netutils "k8s.io/utils/net" +) + +// Interface is an injectable interface for running ipset commands. Implementations must be goroutine-safe. +type Interface interface { + // FlushSet deletes all entries from a named set. + FlushSet(set string) error + // DestroySet deletes a named set. + DestroySet(set string) error + // DestroyAllSets deletes all sets. + DestroyAllSets() error + // CreateSet creates a new set. It will ignore error when the set already exists if ignoreExistErr=true. + CreateSet(set *IPSet, ignoreExistErr bool) error + // AddEntry adds a new entry to the named set. It will ignore error when the entry already exists if ignoreExistErr=true. + AddEntry(entry string, set *IPSet, ignoreExistErr bool) error + // DelEntry deletes one entry from the named set + DelEntry(entry string, set string) error + // Test test if an entry exists in the named set + TestEntry(entry string, set string) (bool, error) + // ListEntries lists all the entries from a named set + ListEntries(set string) ([]string, error) + // ListSets list all set names from kernel + ListSets() ([]string, error) + // GetVersion returns the "X.Y" version string for ipset. + GetVersion() (string, error) +} + +// IPSetCmd represents the ipset util. We use ipset command for ipset execute. +const IPSetCmd = "ipset" + +// EntryMemberPattern is the regular expression pattern of ipset member list. +// The raw output of ipset command `ipset list {set}` is similar to, +// Name: foobar +// Type: hash:ip,port +// Revision: 2 +// Header: family inet hashsize 1024 maxelem 65536 +// Size in memory: 16592 +// References: 0 +// Members: +// 192.168.1.2,tcp:8080 +// 192.168.1.1,udp:53 +var EntryMemberPattern = "(?m)^(.*\n)*Members:\n" + +// VersionPattern is the regular expression pattern of ipset version string. +// ipset version output is similar to "v6.10". +var VersionPattern = "v[0-9]+\\.[0-9]+" + +// IPSet implements an Interface to a set. +type IPSet struct { + // Name is the set name. + Name string + // SetType specifies the ipset type. + SetType Type + // HashFamily specifies the protocol family of the IP addresses to be stored in the set. + // The default is inet, i.e IPv4. If users want to use IPv6, they should specify inet6. + HashFamily string + // HashSize specifies the hash table size of ipset. + HashSize int + // MaxElem specifies the max element number of ipset. + MaxElem int + // PortRange specifies the port range of bitmap:port type ipset. + PortRange string + // comment message for ipset + Comment string +} + +// Validate checks if a given ipset is valid or not. +func (set *IPSet) Validate() error { + // Check if protocol is valid for `HashIPPort`, `HashIPPortIP` and `HashIPPortNet` type set. + if set.SetType == HashIPPort || set.SetType == HashIPPortIP || set.SetType == HashIPPortNet { + if err := validateHashFamily(set.HashFamily); err != nil { + return err + } + } + // check set type + if err := validateIPSetType(set.SetType); err != nil { + return err + } + // check port range for bitmap type set + if set.SetType == BitmapPort { + if err := validatePortRange(set.PortRange); err != nil { + return err + } + } + // check hash size value of ipset + if set.HashSize <= 0 { + return fmt.Errorf("invalid HashSize: %d", set.HashSize) + } + // check max elem value of ipset + if set.MaxElem <= 0 { + return fmt.Errorf("invalid MaxElem %d", set.MaxElem) + } + + return nil +} + +// setIPSetDefaults sets some IPSet fields if not present to their default values. +func (set *IPSet) setIPSetDefaults() { + // Setting default values if not present + if set.HashSize == 0 { + set.HashSize = 1024 + } + if set.MaxElem == 0 { + set.MaxElem = 65536 + } + // Default protocol is IPv4 + if set.HashFamily == "" { + set.HashFamily = ProtocolFamilyIPV4 + } + // Default ipset type is "hash:ip,port" + if len(set.SetType) == 0 { + set.SetType = HashIPPort + } + if len(set.PortRange) == 0 { + set.PortRange = DefaultPortRange + } +} + +// Entry represents a ipset entry. +type Entry struct { + // IP is the entry's IP. The IP address protocol corresponds to the HashFamily of IPSet. + // All entries' IP addresses in the same ip set has same the protocol, IPv4 or IPv6. + IP string + // Port is the entry's Port. + Port int + // Protocol is the entry's Protocol. The protocols of entries in the same ip set are all + // the same. The accepted protocols are TCP, UDP and SCTP. + Protocol string + // Net is the entry's IP network address. Network address with zero prefix size can NOT + // be stored. + Net string + // IP2 is the entry's second IP. IP2 may not be empty for `hash:ip,port,ip` type ip set. + IP2 string + // SetType is the type of ipset where the entry exists. + SetType Type +} + +// Validate checks if a given ipset entry is valid or not. The set parameter is the ipset that entry belongs to. +func (e *Entry) Validate(set *IPSet) bool { + if e.Port < 0 { + klog.Errorf("Entry %v port number %d should be >=0 for ipset %v", e, e.Port, set) + return false + } + switch e.SetType { + case HashIP: + //check if IP of Entry is valid. + if valid := e.checkIP(set); !valid { + return false + } + case HashIPPort: + //check if IP and Protocol of Entry is valid. + if valid := e.checkIPandProtocol(set); !valid { + return false + } + case HashIPPortIP: + //check if IP and Protocol of Entry is valid. + if valid := e.checkIPandProtocol(set); !valid { + return false + } + + // IP2 can not be empty for `hash:ip,port,ip` type ip set + if netutils.ParseIPSloppy(e.IP2) == nil { + klog.Errorf("Error parsing entry %v second ip address %v for ipset %v", e, e.IP2, set) + return false + } + case HashIPPortNet: + //check if IP and Protocol of Entry is valid. + if valid := e.checkIPandProtocol(set); !valid { + return false + } + + // Net can not be empty for `hash:ip,port,net` type ip set + if _, ipNet, err := netutils.ParseCIDRSloppy(e.Net); ipNet == nil { + klog.Errorf("Error parsing entry %v ip net %v for ipset %v, error: %v", e, e.Net, set, err) + return false + } + case BitmapPort: + // check if port number satisfies its ipset's requirement of port range + if set == nil { + klog.Errorf("Unable to reference ip set where the entry %v exists", e) + return false + } + begin, end, err := parsePortRange(set.PortRange) + if err != nil { + klog.Errorf("Failed to parse set %v port range %s for ipset %v, error: %v", set, set.PortRange, set, err) + return false + } + if e.Port < begin || e.Port > end { + klog.Errorf("Entry %v port number %d is not in the port range %s of its ipset %v", e, e.Port, set.PortRange, set) + return false + } + } + + return true +} + +// String returns the string format for ipset entry. +func (e *Entry) String() string { + switch e.SetType { + case HashIP: + // Entry{192.168.1.1} -> 192.168.1.1 + return fmt.Sprintf("%s", e.IP) + case HashIPPort: + // Entry{192.168.1.1, udp, 53} -> 192.168.1.1,udp:53 + // Entry{192.168.1.2, tcp, 8080} -> 192.168.1.2,tcp:8080 + return fmt.Sprintf("%s,%s:%s", e.IP, e.Protocol, strconv.Itoa(e.Port)) + case HashIPPortIP: + // Entry{192.168.1.1, udp, 53, 10.0.0.1} -> 192.168.1.1,udp:53,10.0.0.1 + // Entry{192.168.1.2, tcp, 8080, 192.168.1.2} -> 192.168.1.2,tcp:8080,192.168.1.2 + return fmt.Sprintf("%s,%s:%s,%s", e.IP, e.Protocol, strconv.Itoa(e.Port), e.IP2) + case HashIPPortNet: + // Entry{192.168.1.2, udp, 80, 10.0.1.0/24} -> 192.168.1.2,udp:80,10.0.1.0/24 + // Entry{192.168.2,25, tcp, 8080, 10.1.0.0/16} -> 192.168.2,25,tcp:8080,10.1.0.0/16 + return fmt.Sprintf("%s,%s:%s,%s", e.IP, e.Protocol, strconv.Itoa(e.Port), e.Net) + case BitmapPort: + // Entry{53} -> 53 + // Entry{8080} -> 8080 + return strconv.Itoa(e.Port) + } + return "" +} + +// checkIPandProtocol checks if IP and Protocol of Entry is valid. +func (e *Entry) checkIPandProtocol(set *IPSet) bool { + // set default protocol to tcp if empty + if len(e.Protocol) == 0 { + e.Protocol = ProtocolTCP + } else if !validateProtocol(e.Protocol) { + return false + } + return e.checkIP(set) +} + +// checkIP checks if IP of Entry is valid. +func (e *Entry) checkIP(set *IPSet) bool { + if netutils.ParseIPSloppy(e.IP) == nil { + klog.Errorf("Error parsing entry %v ip address %v for ipset %v", e, e.IP, set) + return false + } + + return true +} + +type runner struct { + exec utilexec.Interface +} + +// New returns a new Interface which will exec ipset. +func New(exec utilexec.Interface) Interface { + return &runner{ + exec: exec, + } +} + +// CreateSet creates a new set, it will ignore error when the set already exists if ignoreExistErr=true. +func (runner *runner) CreateSet(set *IPSet, ignoreExistErr bool) error { + // sets some IPSet fields if not present to their default values. + set.setIPSetDefaults() + + // Validate ipset before creating + if err := set.Validate(); err != nil { + return err + } + return runner.createSet(set, ignoreExistErr) +} + +// If ignoreExistErr is set to true, then the -exist option of ipset will be specified, ipset ignores the error +// otherwise raised when the same set (setname and create parameters are identical) already exists. +func (runner *runner) createSet(set *IPSet, ignoreExistErr bool) error { + args := []string{"create", set.Name, string(set.SetType)} + if set.SetType == HashIPPortIP || set.SetType == HashIPPort || set.SetType == HashIPPortNet || set.SetType == HashIP { + args = append(args, + "family", set.HashFamily, + "hashsize", strconv.Itoa(set.HashSize), + "maxelem", strconv.Itoa(set.MaxElem), + ) + } + if set.SetType == BitmapPort { + args = append(args, "range", set.PortRange) + } + if ignoreExistErr { + args = append(args, "-exist") + } + if _, err := runner.exec.Command(IPSetCmd, args...).CombinedOutput(); err != nil { + return fmt.Errorf("error creating ipset %s, error: %v", set.Name, err) + } + return nil +} + +// AddEntry adds a new entry to the named set. +// If the -exist option is specified, ipset ignores the error otherwise raised when +// the same set (setname and create parameters are identical) already exists. +func (runner *runner) AddEntry(entry string, set *IPSet, ignoreExistErr bool) error { + args := []string{"add", set.Name, entry} + if ignoreExistErr { + args = append(args, "-exist") + } + if out, err := runner.exec.Command(IPSetCmd, args...).CombinedOutput(); err != nil { + return fmt.Errorf("error adding entry %s, error: %v (%s)", entry, err, out) + } + return nil +} + +// DelEntry is used to delete the specified entry from the set. +func (runner *runner) DelEntry(entry string, set string) error { + if out, err := runner.exec.Command(IPSetCmd, "del", set, entry).CombinedOutput(); err != nil { + return fmt.Errorf("error deleting entry %s: from set: %s, error: %v (%s)", entry, set, err, out) + } + return nil +} + +// TestEntry is used to check whether the specified entry is in the set or not. +func (runner *runner) TestEntry(entry string, set string) (bool, error) { + if out, err := runner.exec.Command(IPSetCmd, "test", set, entry).CombinedOutput(); err == nil { + reg, e := regexp.Compile("is NOT in set " + set) + if e == nil && reg.MatchString(string(out)) { + return false, nil + } else if e == nil { + return true, nil + } else { + return false, fmt.Errorf("error testing entry: %s, error: %v", entry, e) + } + } else { + return false, fmt.Errorf("error testing entry %s: %v (%s)", entry, err, out) + } +} + +// FlushSet deletes all entries from a named set. +func (runner *runner) FlushSet(set string) error { + if _, err := runner.exec.Command(IPSetCmd, "flush", set).CombinedOutput(); err != nil { + return fmt.Errorf("error flushing set: %s, error: %v", set, err) + } + return nil +} + +// DestroySet is used to destroy a named set. +func (runner *runner) DestroySet(set string) error { + if out, err := runner.exec.Command(IPSetCmd, "destroy", set).CombinedOutput(); err != nil { + return fmt.Errorf("error destroying set %s, error: %v(%s)", set, err, out) + } + return nil +} + +// DestroyAllSets is used to destroy all sets. +func (runner *runner) DestroyAllSets() error { + if _, err := runner.exec.Command(IPSetCmd, "destroy").CombinedOutput(); err != nil { + return fmt.Errorf("error destroying all sets, error: %v", err) + } + return nil +} + +// ListSets list all set names from kernel +func (runner *runner) ListSets() ([]string, error) { + out, err := runner.exec.Command(IPSetCmd, "list", "-n").CombinedOutput() + if err != nil { + return nil, fmt.Errorf("error listing all sets, error: %v", err) + } + return strings.Split(string(out), "\n"), nil +} + +// ListEntries lists all the entries from a named set. +func (runner *runner) ListEntries(set string) ([]string, error) { + if len(set) == 0 { + return nil, fmt.Errorf("set name can't be nil") + } + out, err := runner.exec.Command(IPSetCmd, "list", set).CombinedOutput() + if err != nil { + return nil, fmt.Errorf("error listing set: %s, error: %v", set, err) + } + memberMatcher := regexp.MustCompile(EntryMemberPattern) + list := memberMatcher.ReplaceAllString(string(out[:]), "") + strs := strings.Split(list, "\n") + results := make([]string, 0) + for i := range strs { + if len(strs[i]) > 0 { + results = append(results, strs[i]) + } + } + return results, nil +} + +// GetVersion returns the version string. +func (runner *runner) GetVersion() (string, error) { + return getIPSetVersionString(runner.exec) +} + +// getIPSetVersionString runs "ipset --version" to get the version string +// in the form of "X.Y", i.e "6.19" +func getIPSetVersionString(exec utilexec.Interface) (string, error) { + cmd := exec.Command(IPSetCmd, "--version") + cmd.SetStdin(bytes.NewReader([]byte{})) + bytes, err := cmd.CombinedOutput() + if err != nil { + return "", err + } + versionMatcher := regexp.MustCompile(VersionPattern) + match := versionMatcher.FindStringSubmatch(string(bytes)) + if match == nil { + return "", fmt.Errorf("no ipset version found in string: %s", bytes) + } + return match[0], nil +} + +// checks if port range is valid. The begin port number is not necessarily less than +// end port number - ipset util can accept it. It means both 1-100 and 100-1 are valid. +func validatePortRange(portRange string) error { + strs := strings.Split(portRange, "-") + if len(strs) != 2 { + return fmt.Errorf("invalid PortRange: %q", portRange) + } + for i := range strs { + num, err := strconv.Atoi(strs[i]) + if err != nil { + return fmt.Errorf("invalid PortRange: %q", portRange) + } + if num < 0 { + return fmt.Errorf("invalid PortRange: %q", portRange) + } + } + return nil +} + +// checks if the given ipset type is valid. +func validateIPSetType(set Type) error { + for _, valid := range ValidIPSetTypes { + if set == valid { + return nil + } + } + return fmt.Errorf("unsupported SetType: %q", set) +} + +// checks if given hash family is supported in ipset +func validateHashFamily(family string) error { + if family == ProtocolFamilyIPV4 || family == ProtocolFamilyIPV6 { + return nil + } + return fmt.Errorf("unsupported HashFamily %q", family) +} + +// IsNotFoundError returns true if the error indicates "not found". It parses +// the error string looking for known values, which is imperfect but works in +// practice. +func IsNotFoundError(err error) bool { + es := err.Error() + if strings.Contains(es, "does not exist") { + // set with the same name already exists + // xref: https://github.com/Olipro/ipset/blob/master/lib/errcode.c#L32-L33 + return true + } + if strings.Contains(es, "element is missing") { + // entry is missing from the set + // xref: https://github.com/Olipro/ipset/blob/master/lib/parse.c#L1904 + // https://github.com/Olipro/ipset/blob/master/lib/parse.c#L1925 + return true + } + return false +} + +// checks if given protocol is supported in entry +func validateProtocol(protocol string) bool { + if protocol == ProtocolTCP || protocol == ProtocolUDP || protocol == ProtocolSCTP { + return true + } + klog.Errorf("Invalid entry's protocol: %s, supported protocols are [%s, %s, %s]", protocol, ProtocolTCP, ProtocolUDP, ProtocolSCTP) + return false +} + +// parsePortRange parse the begin and end port from a raw string(format: a-b). beginPort <= endPort +// in the return value. +func parsePortRange(portRange string) (beginPort int, endPort int, err error) { + if len(portRange) == 0 { + portRange = DefaultPortRange + } + + strs := strings.Split(portRange, "-") + if len(strs) != 2 { + // port number -1 indicates invalid + return -1, -1, fmt.Errorf("port range should be in the format of `a-b`") + } + for i := range strs { + num, err := strconv.Atoi(strs[i]) + if err != nil { + // port number -1 indicates invalid + return -1, -1, err + } + if num < 0 { + // port number -1 indicates invalid + return -1, -1, fmt.Errorf("port number %d should be >=0", num) + } + if i == 0 { + beginPort = num + continue + } + endPort = num + // switch when first port number > second port number + if beginPort > endPort { + endPort = beginPort + beginPort = num + } + } + return beginPort, endPort, nil +} + +var _ = Interface(&runner{}) diff --git a/vendor/k8s.io/kubernetes/pkg/util/ipset/types.go b/vendor/k8s.io/kubernetes/pkg/util/ipset/types.go new file mode 100644 index 000000000..11f98d712 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/util/ipset/types.go @@ -0,0 +1,65 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipset + +// Type represents the ipset type +type Type string + +const ( + // HashIPPort represents the `hash:ip,port` type ipset. The hash:ip,port is similar to hash:ip but + // you can store IP address and protocol-port pairs in it. TCP, SCTP, UDP, UDPLITE, ICMP and ICMPv6 are supported + // with port numbers/ICMP(v6) types and other protocol numbers without port information. + HashIPPort Type = "hash:ip,port" + // HashIPPortIP represents the `hash:ip,port,ip` type ipset. The hash:ip,port,ip set type uses a hash to store + // IP address, port number and a second IP address triples. The port number is interpreted together with a + // protocol (default TCP) and zero protocol number cannot be used. + HashIPPortIP Type = "hash:ip,port,ip" + // HashIPPortNet represents the `hash:ip,port,net` type ipset. The hash:ip,port,net set type uses a hash to store IP address, port number and IP network address triples. The port + // number is interpreted together with a protocol (default TCP) and zero protocol number cannot be used. Network address + // with zero prefix size cannot be stored either. + HashIPPortNet Type = "hash:ip,port,net" + // BitmapPort represents the `bitmap:port` type ipset. The bitmap:port set type uses a memory range, where each bit + // represents one TCP/UDP port. A bitmap:port type of set can store up to 65535 ports. + BitmapPort Type = "bitmap:port" + // HashIP represents the `hash:ip` type ipset. + HashIP Type = "hash:ip" +) + +// DefaultPortRange defines the default bitmap:port valid port range. +const DefaultPortRange string = "0-65535" + +const ( + // ProtocolFamilyIPV4 represents IPv4 protocol. + ProtocolFamilyIPV4 = "inet" + // ProtocolFamilyIPV6 represents IPv6 protocol. + ProtocolFamilyIPV6 = "inet6" + // ProtocolTCP represents TCP protocol. + ProtocolTCP = "tcp" + // ProtocolUDP represents UDP protocol. + ProtocolUDP = "udp" + // ProtocolSCTP represents SCTP protocol. + ProtocolSCTP = "sctp" +) + +// ValidIPSetTypes defines the supported ip set type. +var ValidIPSetTypes = []Type{ + HashIPPort, + HashIPPortIP, + BitmapPort, + HashIPPortNet, + HashIP, +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 6382b8dfd..1343cebd5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1774,6 +1774,7 @@ k8s.io/kubernetes/pkg/scheduler/profile k8s.io/kubernetes/pkg/scheduler/util k8s.io/kubernetes/pkg/securitycontext k8s.io/kubernetes/pkg/util/hash +k8s.io/kubernetes/pkg/util/ipset k8s.io/kubernetes/pkg/util/labels k8s.io/kubernetes/pkg/util/parsers k8s.io/kubernetes/pkg/util/taints