diff --git a/cmd/clusterlink/elector/app/elector.go b/cmd/clusterlink/elector/app/elector.go index e805615f1..334363e34 100644 --- a/cmd/clusterlink/elector/app/elector.go +++ b/cmd/clusterlink/elector/app/elector.go @@ -126,10 +126,11 @@ func run(ctx context.Context, opts *options.Options) error { err := elector.EnsureGateWayRole() if err != nil { klog.Errorf("set gateway role failure: %v, retry after 10 sec.", err) - time.Sleep(10 * time.Second) + time.Sleep(3 * time.Second) } else { - klog.V(4).Info("ensure gateway role success, recheck after 60 sec.") - time.Sleep(60 * time.Second) + timeToRecheck := 3 * time.Second + klog.V(4).Infof("ensure gateway role success, recheck after %d sec.", int(timeToRecheck)) + time.Sleep(timeToRecheck) } } } diff --git a/deploy/crds/kosmos.io_clusternodes.yaml b/deploy/crds/kosmos.io_clusternodes.yaml index fb61383c0..9d5dace88 100644 --- a/deploy/crds/kosmos.io_clusternodes.yaml +++ b/deploy/crds/kosmos.io_clusternodes.yaml @@ -45,6 +45,8 @@ spec: properties: clusterName: type: string + elasticip: + type: string interfaceName: type: string ip: @@ -63,11 +65,13 @@ spec: type: array type: object status: + properties: + nodeStatus: + type: string type: object required: - spec type: object served: true storage: true - subresources: - status: {} + subresources: {} diff --git a/deploy/crds/kosmos.io_clusters.yaml b/deploy/crds/kosmos.io_clusters.yaml index d96895c13..2f20e84bd 100644 --- a/deploy/crds/kosmos.io_clusters.yaml +++ b/deploy/crds/kosmos.io_clusters.yaml @@ -105,6 +105,12 @@ spec: - nodeName type: object type: array + nodeElasticIPMap: + additionalProperties: + type: string + description: NodeElasticIPMap presents mapping between nodename + in kubernetes and elasticIP + type: object useIPPool: default: false type: boolean diff --git a/deploy/crds/kosmos.io_nodeconfigs.yaml b/deploy/crds/kosmos.io_nodeconfigs.yaml index 78679263e..65483ce1a 100644 --- a/deploy/crds/kosmos.io_nodeconfigs.yaml +++ b/deploy/crds/kosmos.io_nodeconfigs.yaml @@ -122,6 +122,48 @@ spec: - gw type: object type: array + xfrmpolicies: + items: + properties: + dir: + type: integer + leftip: + type: string + leftnet: + type: string + reqid: + type: integer + rightip: + type: string + rightnet: + type: string + required: + - dir + - leftip + - leftnet + - reqid + - rightip + - rightnet + type: object + type: array + xfrmstates: + items: + properties: + PSK: + type: string + leftip: + type: string + reqid: + type: integer + rightip: + type: string + required: + - PSK + - leftip + - reqid + - rightip + type: object + type: array type: object status: properties: diff --git a/pkg/apis/kosmos/v1alpha1/cluster_types.go b/pkg/apis/kosmos/v1alpha1/cluster_types.go index 1d997455c..c3b18e1cb 100644 --- a/pkg/apis/kosmos/v1alpha1/cluster_types.go +++ b/pkg/apis/kosmos/v1alpha1/cluster_types.go @@ -91,6 +91,10 @@ type ClusterLinkOptions struct { // +optional GlobalCIDRsMap map[string]string `json:"globalCIDRsMap,omitempty"` + + // NodeElasticIPMap presents mapping between nodename in kubernetes and elasticIP + // +optional + NodeElasticIPMap map[string]string `json:"nodeElasticIPMap,omitempty"` } type ClusterTreeOptions struct { diff --git a/pkg/apis/kosmos/v1alpha1/clusternode_types.go b/pkg/apis/kosmos/v1alpha1/clusternode_types.go index 8829d55b2..6708419d4 100644 --- a/pkg/apis/kosmos/v1alpha1/clusternode_types.go +++ b/pkg/apis/kosmos/v1alpha1/clusternode_types.go @@ -8,7 +8,6 @@ import ( // +genclient // +genclient:nonNamespaced -// +kubebuilder:subresource:status // +kubebuilder:resource:scope="Cluster" // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +kubebuilder:printcolumn:name="ROLES",type=string,JSONPath=`.spec.roles` @@ -33,6 +32,8 @@ type ClusterNodeSpec struct { // +optional IP string `json:"ip,omitempty"` // +optional + ElasticIP string `json:"elasticip,omitempty"` + // +optional IP6 string `json:"ip6,omitempty"` // +optional Roles []Role `json:"roles,omitempty"` @@ -41,6 +42,8 @@ type ClusterNodeSpec struct { } type ClusterNodeStatus struct { + // +optional + NodeStatus string `json:"nodeStatus,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/kosmos/v1alpha1/constants.go b/pkg/apis/kosmos/v1alpha1/constants.go index 56b37c550..116868cd2 100644 --- a/pkg/apis/kosmos/v1alpha1/constants.go +++ b/pkg/apis/kosmos/v1alpha1/constants.go @@ -28,3 +28,16 @@ type DeviceType string const ( VxlanDevice DeviceType = "vxlan" ) + +const ( + PSK string = "bfd6224354977084568832b811226b3d6cff6685" + ReqID int = 336 +) + +type IPSECDirection int + +const ( + IPSECIn IPSECDirection = 0 + IPSECOut IPSECDirection = 1 + IPSECFwd IPSECDirection = 2 +) diff --git a/pkg/apis/kosmos/v1alpha1/nodeconfig_types.go b/pkg/apis/kosmos/v1alpha1/nodeconfig_types.go index 1f4a77a02..cf7b8f9fc 100644 --- a/pkg/apis/kosmos/v1alpha1/nodeconfig_types.go +++ b/pkg/apis/kosmos/v1alpha1/nodeconfig_types.go @@ -21,11 +21,13 @@ type NodeConfig struct { } type NodeConfigSpec struct { - Devices []Device `json:"devices,omitempty"` - Routes []Route `json:"routes,omitempty"` - Iptables []Iptables `json:"iptables,omitempty"` - Fdbs []Fdb `json:"fdbs,omitempty"` - Arps []Arp `json:"arps,omitempty"` + Devices []Device `json:"devices,omitempty"` + Routes []Route `json:"routes,omitempty"` + Iptables []Iptables `json:"iptables,omitempty"` + Fdbs []Fdb `json:"fdbs,omitempty"` + Arps []Arp `json:"arps,omitempty"` + XfrmPolicies []XfrmPolicy `json:"xfrmpolicies,omitempty"` + XfrmStates []XfrmState `json:"xfrmstates,omitempty"` } type NodeConfigStatus struct { @@ -101,6 +103,38 @@ func (a *Arp) Compare(v Arp) bool { a.Dev == v.Dev } +type XfrmPolicy struct { + LeftIP string `json:"leftip"` + LeftNet string `json:"leftnet"` + RightIP string `json:"rightip"` + RightNet string `json:"rightnet"` + ReqID int `json:"reqid"` + Dir int `json:"dir"` +} + +func (a *XfrmPolicy) Compare(v XfrmPolicy) bool { + return a.LeftIP == v.LeftIP && + a.LeftNet == v.LeftNet && + a.RightNet == v.RightNet && + a.RightIP == v.RightIP && + a.ReqID == v.ReqID && + a.Dir == v.Dir +} + +type XfrmState struct { + LeftIP string `json:"leftip"` + RightIP string `json:"rightip"` + ReqID int `json:"reqid"` + PSK string `json:"PSK"` +} + +func (a *XfrmState) Compare(v XfrmState) bool { + return a.LeftIP == v.LeftIP && + a.RightIP == v.RightIP && + a.ReqID == v.ReqID && + a.PSK == v.PSK +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type NodeConfigList struct { diff --git a/pkg/clusterlink/agent/network-manager/network_manager.go b/pkg/clusterlink/agent/network-manager/network_manager.go index f1898fa55..4583b71d8 100644 --- a/pkg/clusterlink/agent/network-manager/network_manager.go +++ b/pkg/clusterlink/agent/network-manager/network_manager.go @@ -112,6 +112,21 @@ func (e *NetworkManager) Diff(oldConfig, newConfig *clusterlinkv1alpha1.NodeConf createConfig.Routes = createRecord isSame = false } + // ipsec: + if flag, deleteRecord, createRecord := compareFunc(oldConfig.XfrmPolicies, newConfig.XfrmPolicies, func(a, b clusterlinkv1alpha1.XfrmPolicy) bool { + return a.Compare(b) + }); !flag { + deleteConfig.XfrmPolicies = deleteRecord + createConfig.XfrmPolicies = createRecord + isSame = false + } + if flag, deleteRecord, createRecord := compareFunc(oldConfig.XfrmStates, newConfig.XfrmStates, func(a, b clusterlinkv1alpha1.XfrmState) bool { + return a.Compare(b) + }); !flag { + deleteConfig.XfrmStates = deleteRecord + createConfig.XfrmStates = createRecord + isSame = false + } // iptables: if flag, deleteRecord, createRecord := compareFunc(oldConfig.Iptables, newConfig.Iptables, func(a, b clusterlinkv1alpha1.Iptables) bool { return a.Compare(b) @@ -188,6 +203,18 @@ func (e *NetworkManager) WriteSys(configDiff *ConfigDiff) error { errs = errors.Wrap(err, fmt.Sprint(errs)) } } + if config.XfrmPolicies != nil { + if err := e.NetworkInterface.DeleteXfrmPolicies(config.XfrmPolicies); err != nil { + klog.Warning(err) + errs = errors.Wrap(err, fmt.Sprint(errs)) + } + } + if config.XfrmStates != nil { + if err := e.NetworkInterface.DeleteXfrmStates(config.XfrmStates); err != nil { + klog.Warning(err) + errs = errors.Wrap(err, fmt.Sprint(errs)) + } + } } if configDiff.createConfig != nil { @@ -223,6 +250,18 @@ func (e *NetworkManager) WriteSys(configDiff *ConfigDiff) error { errs = errors.Wrap(err, fmt.Sprint(errs)) } } + if config.XfrmPolicies != nil { + if err := e.NetworkInterface.AddXfrmPolicies(config.XfrmPolicies); err != nil { + klog.Warning(err) + errs = errors.Wrap(err, fmt.Sprint(errs)) + } + } + if config.XfrmStates != nil { + if err := e.NetworkInterface.AddXfrmStates(config.XfrmStates); err != nil { + klog.Warning(err) + errs = errors.Wrap(err, fmt.Sprint(errs)) + } + } } return errs @@ -254,11 +293,13 @@ func (e *NetworkManager) UpdateFromChecker() NodeConfigSyncStatus { } func printNodeConfig(data *clusterlinkv1alpha1.NodeConfigSpec) { - klog.Infof("device: ", data.Devices) - klog.Infof("Arps: ", data.Arps) - klog.Infof("Fdbs: ", data.Fdbs) - klog.Infof("Iptables: ", data.Iptables) - klog.Infof("Routes: ", data.Routes) + klog.Infof("device: %v", data.Devices) + klog.Infof("Arps: %v", data.Arps) + klog.Infof("Fdbs: %v", data.Fdbs) + klog.Infof("Iptables: %v", data.Iptables) + klog.Infof("Routes: %v", data.Routes) + klog.Infof("XfrmPolicys: %v", data.XfrmPolicies) + klog.Infof("XfrmStates: %v", data.XfrmStates) } func (e *NetworkManager) UpdateSync() NodeConfigSyncStatus { diff --git a/pkg/clusterlink/controllers/node/node_controller.go b/pkg/clusterlink/controllers/node/node_controller.go index 65cd51494..7cbcc61cc 100644 --- a/pkg/clusterlink/controllers/node/node_controller.go +++ b/pkg/clusterlink/controllers/node/node_controller.go @@ -114,12 +114,28 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( return reconcile.Result{Requeue: true}, nil } + var elasticIP string + elasticIPMap := cluster.Spec.ClusterLinkOptions.NodeElasticIPMap + if len(elasticIPMap) != 0 { + if elasticIPtoParse, ok := elasticIPMap[node.Name]; ok { + _, proto := ParseIP(elasticIPtoParse) + // Now elasticIP only support IPv4 + if proto == 4 { + elasticIP = elasticIPtoParse + } + } + } err = CreateOrUpdateClusterNode(r.ClusterLinkClient, clusterNode, func(n *clusterlinkv1alpha1.ClusterNode) error { n.Spec.NodeName = node.Name n.Spec.ClusterName = r.ClusterName n.Spec.IP = internalIP n.Spec.IP6 = internalIP6 n.Spec.InterfaceName = interfacepolicy.GetInterfaceName(cluster.Spec.ClusterLinkOptions.NICNodeNames, node.Name, cluster.Spec.ClusterLinkOptions.DefaultNICName) + n.Spec.ElasticIP = elasticIP + + if utils.NodeReady(&node) { + n.Status.NodeStatus = string(corev1.NodeReady) + } return nil }) if err != nil { diff --git a/pkg/clusterlink/elector/elector.go b/pkg/clusterlink/elector/elector.go index 4d1004496..014dbb844 100644 --- a/pkg/clusterlink/elector/elector.go +++ b/pkg/clusterlink/elector/elector.go @@ -2,12 +2,16 @@ package elector import ( "context" + "net" "os" + "sort" + apicorev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/node" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/utils" "github.com/kosmos.io/kosmos/pkg/utils/role" @@ -32,12 +36,53 @@ func (e *Elector) EnsureGateWayRole() error { if err != nil { return err } - _, err = e.controlPanelClient.KosmosV1alpha1().Clusters().Get(context.TODO(), e.clusterName, metav1.GetOptions{}) + cluster, err := e.controlPanelClient.KosmosV1alpha1().Clusters().Get(context.TODO(), e.clusterName, metav1.GetOptions{}) if err != nil { return err } + if len(cluster.Spec.ClusterLinkOptions.NodeElasticIPMap) > 0 { + var readyNodes = make([]string, 0, 5) + currentNodeName := os.Getenv(utils.EnvNodeName) + elasticIPMap := cluster.Spec.ClusterLinkOptions.NodeElasticIPMap + isCurrentNodeIn := false + for nodeName := range elasticIPMap { + if nodeName == currentNodeName { + isCurrentNodeIn = true + break + } + } + if !isCurrentNodeIn { + for nodeName := range elasticIPMap { + clusternode, err := e.controlPanelClient.KosmosV1alpha1().ClusterNodes().Get(context.TODO(), + node.ClusterNodeName(e.clusterName, nodeName), metav1.GetOptions{}) + if err != nil { + klog.Errorf("node %s is invalid: %v", nodeName, err) + continue + } + if net.ParseIP(elasticIPMap[nodeName]) == nil { + klog.Errorf("elasticIP %s is invalid", elasticIPMap[nodeName]) + continue + } + if len(clusternode.Status.NodeStatus) > 0 && + clusternode.Status.NodeStatus == string(apicorev1.NodeReady) { + readyNodes = append(readyNodes, nodeName) + } + } + } + if len(readyNodes) > 0 { + sort.Strings(readyNodes) + e.nodeName = readyNodes[0] + } else { + e.nodeName = os.Getenv(utils.EnvNodeName) + } + } else { + e.nodeName = os.Getenv(utils.EnvNodeName) + } + modifyNodes := e.genModifyNode(clusterNodes.Items) - klog.Infof("%d node need modify", len(modifyNodes)) + if len(modifyNodes) > 0 { + klog.Infof("%d node need modify", len(modifyNodes)) + } for i := range modifyNodes { node := modifyNodes[i] _, err := e.controlPanelClient.KosmosV1alpha1().ClusterNodes().Update(context.TODO(), &node, metav1.UpdateOptions{}) @@ -56,12 +101,12 @@ func (e *Elector) genModifyNode(clusterNodes []v1alpha1.ClusterNode) []v1alpha1. clusterNode := clusterNodes[i] isGateWay := clusterNode.IsGateway() isSameCluster := clusterNode.Spec.ClusterName == e.clusterName - isCurrentNode := clusterNode.Spec.NodeName == e.nodeName + isNewGwNode := clusterNode.Spec.NodeName == e.nodeName if isSameCluster { - if !isCurrentNode && isGateWay { + if !isNewGwNode && isGateWay { role.RemoveRole(&clusterNode, v1alpha1.RoleGateway) modifyNodes = append(modifyNodes, clusterNode) - } else if isCurrentNode && !isGateWay { + } else if isNewGwNode && !isGateWay { role.AddRole(&clusterNode, v1alpha1.RoleGateway) modifyNodes = append(modifyNodes, clusterNode) } diff --git a/pkg/clusterlink/network-manager/handlers/nodeconfig.go b/pkg/clusterlink/network-manager/handlers/nodeconfig.go index e352c6671..d1f1f5165 100644 --- a/pkg/clusterlink/network-manager/handlers/nodeconfig.go +++ b/pkg/clusterlink/network-manager/handlers/nodeconfig.go @@ -12,11 +12,13 @@ import ( // NodeConfig network configuration of the node type NodeConfig struct { - Devices []v1alpha1.Device `json:"devices,omitempty"` - Routes []v1alpha1.Route `json:"routes,omitempty"` - Iptables []v1alpha1.Iptables `json:"iptables,omitempty"` - Fdbs []v1alpha1.Fdb `json:"fdbs,omitempty"` - Arps []v1alpha1.Arp `json:"arps,omitempty"` + Devices []v1alpha1.Device `json:"devices,omitempty"` + Routes []v1alpha1.Route `json:"routes,omitempty"` + Iptables []v1alpha1.Iptables `json:"iptables,omitempty"` + Fdbs []v1alpha1.Fdb `json:"fdbs,omitempty"` + Arps []v1alpha1.Arp `json:"arps,omitempty"` + XfrmPolicies []v1alpha1.XfrmPolicy `json:"xfrmpolicies,omitempty"` + XfrmStates []v1alpha1.XfrmState `json:"xfrmstates,omitempty"` } func (c *NodeConfig) ToString() string { @@ -33,11 +35,13 @@ func (c *NodeConfig) ToJson() ([]byte, error) { func (c *NodeConfig) ConvertToNodeConfigSpec() v1alpha1.NodeConfigSpec { return v1alpha1.NodeConfigSpec{ - Devices: c.Devices, - Routes: c.Routes, - Iptables: c.Iptables, - Fdbs: c.Fdbs, - Arps: c.Arps, + Devices: c.Devices, + Routes: c.Routes, + Iptables: c.Iptables, + Fdbs: c.Fdbs, + Arps: c.Arps, + XfrmStates: c.XfrmStates, + XfrmPolicies: c.XfrmPolicies, } } diff --git a/pkg/clusterlink/network-manager/handlers/pod_routes.go b/pkg/clusterlink/network-manager/handlers/pod_routes.go index 401ef059c..a452fe5db 100644 --- a/pkg/clusterlink/network-manager/handlers/pod_routes.go +++ b/pkg/clusterlink/network-manager/handlers/pod_routes.go @@ -134,11 +134,62 @@ func BuildRoutes(ctx *Context, target *v1alpha1.ClusterNode, cidrs []string) { } if n.IsGateway() || srcCluster.IsP2P() { - ctx.Results[n.Name].Routes = append(ctx.Results[n.Name].Routes, v1alpha1.Route{ - CIDR: cidr, - Gw: targetIP.String(), - Dev: vxBridge, - }) + klog.Infof("Chekc node %s is gateway,t ElasticIP:%s,n ElasticIP: %s", n.Spec.NodeName, target.Spec.ElasticIP, n.Spec.ElasticIP) + if len(target.Spec.ElasticIP) > 0 && len(n.Spec.ElasticIP) > 0 { + nCluster := ctx.Filter.GetClusterByName(n.Spec.ClusterName) + var nPodCIDRs []string + if nCluster.IsP2P() { + nPodCIDRs = n.Spec.PodCIDRs + } else { + nPodCIDRs = nCluster.Status.ClusterLinkStatus.PodCIDRs + } + nPodCIDRs = FilterByIPFamily(nPodCIDRs, nCluster.Spec.ClusterLinkOptions.IPFamily) + nPodCIDRs = ConvertToGlobalCIDRs(nPodCIDRs, nCluster.Spec.ClusterLinkOptions.GlobalCIDRsMap) + ctx.Results[n.Name].XfrmStates = append(ctx.Results[n.Name].XfrmStates, v1alpha1.XfrmState{ + LeftIP: n.Spec.IP, + RightIP: target.Spec.ElasticIP, + ReqID: v1alpha1.ReqID, + PSK: v1alpha1.PSK, + }) + ctx.Results[n.Name].XfrmStates = append(ctx.Results[n.Name].XfrmStates, v1alpha1.XfrmState{ + RightIP: n.Spec.ElasticIP, + LeftIP: target.Spec.IP, + ReqID: v1alpha1.ReqID, + PSK: v1alpha1.PSK, + }) + for _, ncidr := range nPodCIDRs { + ctx.Results[n.Name].XfrmPolicies = append(ctx.Results[n.Name].XfrmPolicies, v1alpha1.XfrmPolicy{ + LeftIP: n.Spec.IP, + LeftNet: ncidr, + RightIP: target.Spec.ElasticIP, + RightNet: cidr, + ReqID: v1alpha1.ReqID, + Dir: int(v1alpha1.IPSECOut), + }) + ctx.Results[n.Name].XfrmPolicies = append(ctx.Results[n.Name].XfrmPolicies, v1alpha1.XfrmPolicy{ + LeftIP: target.Spec.ElasticIP, + LeftNet: cidr, + RightIP: n.Spec.IP, + RightNet: ncidr, + ReqID: v1alpha1.ReqID, + Dir: int(v1alpha1.IPSECIn), + }) + ctx.Results[n.Name].XfrmPolicies = append(ctx.Results[n.Name].XfrmPolicies, v1alpha1.XfrmPolicy{ + LeftIP: target.Spec.ElasticIP, + LeftNet: cidr, + RightIP: n.Spec.IP, + RightNet: ncidr, + ReqID: v1alpha1.ReqID, + Dir: int(v1alpha1.IPSECFwd), + }) + } + } else { + ctx.Results[n.Name].Routes = append(ctx.Results[n.Name].Routes, v1alpha1.Route{ + CIDR: cidr, + Gw: targetIP.String(), + Dev: vxBridge, + }) + } continue } diff --git a/pkg/clusterlink/network/adapter.go b/pkg/clusterlink/network/adapter.go index c8ce08b80..09c779a0d 100644 --- a/pkg/clusterlink/network/adapter.go +++ b/pkg/clusterlink/network/adapter.go @@ -2,10 +2,13 @@ package network import ( "fmt" + "net" "github.com/pkg/errors" + "github.com/vishvananda/netlink" "k8s.io/klog/v2" + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" ) @@ -51,6 +54,20 @@ func (n *DefaultNetWork) LoadSysConfig() (*clusterlinkv1alpha1.NodeConfigSpec, e nodeConfigSpec.Arps = arps } + xfrmpolicies, err := loadXfrmPolicy() + if err != nil { + errs = errors.Wrap(err, fmt.Sprint(errs)) + } else { + nodeConfigSpec.XfrmPolicies = xfrmpolicies + } + + xfrmstates, err := loadXfrmState() + if err != nil { + errs = errors.Wrap(err, fmt.Sprint(errs)) + } else { + nodeConfigSpec.XfrmStates = xfrmstates + } + return nodeConfigSpec, errs } @@ -124,6 +141,14 @@ func (n *DefaultNetWork) UpdateDevices([]clusterlinkv1alpha1.Device) error { return ErrNotImplemented } +func (n *DefaultNetWork) UpdateIPSECPolicies([]clusterlinkv1alpha1.XfrmPolicy) error { + return ErrNotImplemented +} + +func (n *DefaultNetWork) UpdateXfrmStates([]clusterlinkv1alpha1.XfrmState) error { + return ErrNotImplemented +} + func (n *DefaultNetWork) AddArps(arps []clusterlinkv1alpha1.Arp) error { var errs error for _, arp := range arps { @@ -164,6 +189,88 @@ func (n *DefaultNetWork) AddRoutes(routes []clusterlinkv1alpha1.Route) error { return errs } +// For reference: +// https://github.com/flannel-io/flannel +func (n *DefaultNetWork) AddXfrmPolicies(xfrmpolicies []clusterlinkv1alpha1.XfrmPolicy) error { + for _, xfrmpolicy := range xfrmpolicies { + srcIP := net.ParseIP(xfrmpolicy.LeftIP) + dstIP := net.ParseIP(xfrmpolicy.RightIP) + _, srcNet, _ := net.ParseCIDR(xfrmpolicy.LeftNet) + _, dstNet, _ := net.ParseCIDR(xfrmpolicy.RightNet) + reqID := xfrmpolicy.ReqID + + var err error + var xfrmpolicydir netlink.Dir + switch v1alpha1.IPSECDirection(xfrmpolicy.Dir) { + case v1alpha1.IPSECOut: + xfrmpolicydir = netlink.XFRM_DIR_OUT + case v1alpha1.IPSECIn: + xfrmpolicydir = netlink.XFRM_DIR_IN + case v1alpha1.IPSECFwd: + xfrmpolicydir = netlink.XFRM_DIR_FWD + } + err = AddXFRMPolicy(srcNet, dstNet, srcIP, dstIP, xfrmpolicydir, reqID) + if err != nil { + return fmt.Errorf("error adding ipsec out policy: %v", err) + } + } + return nil +} + +// For reference: +// https://github.com/flannel-io/flannel +func (n *DefaultNetWork) DeleteXfrmPolicies(xfrmpolicies []clusterlinkv1alpha1.XfrmPolicy) error { + for _, xfrmpolicy := range xfrmpolicies { + srcIP := net.ParseIP(xfrmpolicy.LeftIP) + dstIP := net.ParseIP(xfrmpolicy.RightIP) + _, srcNet, _ := net.ParseCIDR(xfrmpolicy.LeftNet) + _, dstNet, _ := net.ParseCIDR(xfrmpolicy.RightNet) + reqID := xfrmpolicy.ReqID + + var xfrmpolicydir netlink.Dir + switch v1alpha1.IPSECDirection(xfrmpolicy.Dir) { + case v1alpha1.IPSECOut: + xfrmpolicydir = netlink.XFRM_DIR_OUT + case v1alpha1.IPSECIn: + xfrmpolicydir = netlink.XFRM_DIR_IN + case v1alpha1.IPSECFwd: + xfrmpolicydir = netlink.XFRM_DIR_FWD + } + + err := DeleteXFRMPolicy(srcNet, dstNet, srcIP, dstIP, xfrmpolicydir, reqID) + if err != nil { + return fmt.Errorf("error deleting ipsec out policy: %v", err) + } + } + return nil +} + +func (n *DefaultNetWork) AddXfrmStates(xfrmstates []clusterlinkv1alpha1.XfrmState) error { + for _, xfrmstate := range xfrmstates { + srcIP := net.ParseIP(xfrmstate.LeftIP) + dstIP := net.ParseIP(xfrmstate.RightIP) + reqID := xfrmstate.ReqID + err := AddXFRMState(srcIP, dstIP, reqID, xfrmstate.PSK) + if err != nil { + return fmt.Errorf("error adding ipsec state: %v", err) + } + } + return nil +} + +func (n *DefaultNetWork) DeleteXfrmStates(xfrmstates []clusterlinkv1alpha1.XfrmState) error { + for _, xfrmstate := range xfrmstates { + srcIP := net.ParseIP(xfrmstate.LeftIP) + dstIP := net.ParseIP(xfrmstate.RightIP) + reqID := xfrmstate.ReqID + err := DeleteXFRMState(srcIP, dstIP, reqID, xfrmstate.PSK) + if err != nil { + return fmt.Errorf("error deleting ipsec state: %v", err) + } + } + return nil +} + func (n *DefaultNetWork) AddDevices(devices []clusterlinkv1alpha1.Device) error { var errs error for _, device := range devices { diff --git a/pkg/clusterlink/network/interface.go b/pkg/clusterlink/network/interface.go index 58b6a43d1..df7c81cb3 100644 --- a/pkg/clusterlink/network/interface.go +++ b/pkg/clusterlink/network/interface.go @@ -19,18 +19,24 @@ type NetWork interface { DeleteIptables([]clusterlinkv1alpha1.Iptables) error DeleteRoutes([]clusterlinkv1alpha1.Route) error DeleteDevices([]clusterlinkv1alpha1.Device) error + DeleteXfrmPolicies([]clusterlinkv1alpha1.XfrmPolicy) error + DeleteXfrmStates([]clusterlinkv1alpha1.XfrmState) error UpdateArps([]clusterlinkv1alpha1.Arp) error UpdateFdbs([]clusterlinkv1alpha1.Fdb) error UpdateIptables([]clusterlinkv1alpha1.Iptables) error UpdateRoutes([]clusterlinkv1alpha1.Route) error UpdateDevices([]clusterlinkv1alpha1.Device) error + UpdateIPSECPolicies([]clusterlinkv1alpha1.XfrmPolicy) error + UpdateXfrmStates([]clusterlinkv1alpha1.XfrmState) error AddArps([]clusterlinkv1alpha1.Arp) error AddFdbs([]clusterlinkv1alpha1.Fdb) error AddIptables([]clusterlinkv1alpha1.Iptables) error AddRoutes([]clusterlinkv1alpha1.Route) error AddDevices([]clusterlinkv1alpha1.Device) error + AddXfrmPolicies([]clusterlinkv1alpha1.XfrmPolicy) error + AddXfrmStates([]clusterlinkv1alpha1.XfrmState) error InitSys() diff --git a/pkg/clusterlink/network/xfrm_policy.go b/pkg/clusterlink/network/xfrm_policy.go new file mode 100644 index 000000000..54079bf25 --- /dev/null +++ b/pkg/clusterlink/network/xfrm_policy.go @@ -0,0 +1,179 @@ +package network + +import ( + "encoding/hex" + "errors" + "fmt" + "net" + "syscall" + + "github.com/vishvananda/netlink" + log "k8s.io/klog/v2" + + clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" +) + +// For reference: +// https://github.com/flannel-io/flannel +func AddXFRMPolicy(srcNet, dstNet *net.IPNet, srcIP, dstIP net.IP, dir netlink.Dir, reqID int) error { + policy := &netlink.XfrmPolicy{ + Src: srcNet, + Dst: dstNet, + Dir: dir, + } + + tmpl := netlink.XfrmPolicyTmpl{ + Src: srcIP, + Dst: dstIP, + Proto: netlink.XFRM_PROTO_ESP, + Mode: netlink.XFRM_MODE_TUNNEL, + Reqid: reqID, + } + + policy.Tmpls = append(policy.Tmpls, tmpl) + + if existingPolicy, err := netlink.XfrmPolicyGet(policy); err != nil { + if errors.Is(err, syscall.ENOENT) { + log.Infof("Adding ipsec policy: %+v", tmpl) + if err := netlink.XfrmPolicyAdd(policy); err != nil { + return fmt.Errorf("error adding policy: %+v err: %v", policy, err) + } + } else { + return fmt.Errorf("error getting policy: %+v err: %v", policy, err) + } + } else { + log.Infof("Updating ipsec policy %+v with %+v", existingPolicy, policy) + if err := netlink.XfrmPolicyUpdate(policy); err != nil { + return fmt.Errorf("error updating policy: %+v err: %v", policy, err) + } + } + return nil +} + +func DeleteXFRMPolicy(srcNet, dstNet *net.IPNet, srcIP, dstIP net.IP, dir netlink.Dir, reqID int) error { + policy := netlink.XfrmPolicy{ + Src: srcNet, + Dst: dstNet, + Dir: dir, + } + + tmpl := netlink.XfrmPolicyTmpl{ + Src: srcIP, + Dst: dstIP, + Proto: netlink.XFRM_PROTO_ESP, + Mode: netlink.XFRM_MODE_TUNNEL, + Reqid: reqID, + } + + log.Infof("Deleting ipsec policy: %+v", tmpl) + + policy.Tmpls = append(policy.Tmpls, tmpl) + + if err := netlink.XfrmPolicyDel(&policy); err != nil { + return fmt.Errorf("error deleting policy: %+v err: %v", policy, err) + } + + return nil +} + +func AddXFRMState(srcIP, dstIP net.IP, reqID int, psk string) error { + k, _ := hex.DecodeString(psk) + state := netlink.XfrmState{ + Src: srcIP, + Dst: dstIP, + Proto: netlink.XFRM_PROTO_ESP, + Mode: netlink.XFRM_MODE_TUNNEL, + Spi: reqID, + Reqid: reqID, + Aead: &netlink.XfrmStateAlgo{ + Name: "rfc4106(gcm(aes))", + Key: k, + ICVLen: 128, + }, + } + + if existingState, err := netlink.XfrmStateGet(&state); err != nil { + if errors.Is(err, syscall.ESRCH) || errors.Is(err, syscall.ENOENT) { + log.Infof("Adding xfrm state: %+v", state) + if err := netlink.XfrmStateAdd(&state); err != nil { + return fmt.Errorf("error adding state: %+v err: %v", state, err) + } + } else { + return fmt.Errorf("error getting state: %+v err: %v", state, err) + } + } else { + log.Infof("Updating xfrm state %+v with %+v", existingState, state) + if err := netlink.XfrmStateUpdate(&state); err != nil { + return fmt.Errorf("error updating state: %+v err: %v", state, err) + } + } + return nil +} + +func DeleteXFRMState(srcIP, dstIP net.IP, reqID int, psk string) error { + k, _ := hex.DecodeString(psk) + state := netlink.XfrmState{ + Src: srcIP, + Dst: dstIP, + Proto: netlink.XFRM_PROTO_ESP, + Mode: netlink.XFRM_MODE_TUNNEL, + Spi: reqID, + Reqid: reqID, + Aead: &netlink.XfrmStateAlgo{ + Name: "rfc4106(gcm(aes))", + Key: k, + ICVLen: 128, + }, + } + log.Infof("Deleting ipsec state: %+v", state) + err := netlink.XfrmStateDel(&state) + if err != nil { + return fmt.Errorf("error delete xfrm state: %+v err: %v", state, err) + } + return nil +} + +func ListXfrmPolicy() ([]clusterlinkv1alpha1.XfrmPolicy, error) { + xfrmpolicies, err := netlink.XfrmPolicyList(netlink.FAMILY_ALL) + if err != nil { + return nil, fmt.Errorf("error list xfrm policy: %v", err) + } + var ret []clusterlinkv1alpha1.XfrmPolicy + for _, policy := range xfrmpolicies { + ret = append(ret, clusterlinkv1alpha1.XfrmPolicy{ + LeftIP: policy.Tmpls[0].Src.String(), + LeftNet: policy.Src.String(), + RightIP: policy.Tmpls[0].Dst.String(), + RightNet: policy.Dst.String(), + ReqID: policy.Tmpls[0].Reqid, + Dir: int(policy.Dir), + }) + } + return ret, nil +} + +func ListXfrmState() ([]clusterlinkv1alpha1.XfrmState, error) { + xfrmstates, err := netlink.XfrmStateList(netlink.FAMILY_ALL) + if err != nil { + return nil, fmt.Errorf("error list xfrm state: %v", err) + } + var ret []clusterlinkv1alpha1.XfrmState + for _, state := range xfrmstates { + k := hex.EncodeToString(state.Aead.Key) + ret = append(ret, clusterlinkv1alpha1.XfrmState{ + LeftIP: state.Src.String(), + RightIP: state.Dst.String(), + ReqID: state.Reqid, + PSK: k, + }) + } + return ret, nil +} + +func loadXfrmPolicy() ([]clusterlinkv1alpha1.XfrmPolicy, error) { + return ListXfrmPolicy() +} + +func loadXfrmState() ([]clusterlinkv1alpha1.XfrmState, error) { + return ListXfrmState() +} diff --git a/pkg/kosmosctl/install/install.go b/pkg/kosmosctl/install/install.go index 8fe6f535c..fa0f09827 100644 --- a/pkg/kosmosctl/install/install.go +++ b/pkg/kosmosctl/install/install.go @@ -3,6 +3,7 @@ package install import ( "context" "fmt" + "net" "os" "path/filepath" @@ -60,6 +61,7 @@ type CommandInstallOptions struct { NetworkType string IpFamily string UseProxy string + NodeElasticIP map[string]string KosmosClient versioned.Interface K8sClient kubernetes.Interface @@ -96,6 +98,7 @@ func NewCmdInstall(f ctlutil.Factory) *cobra.Command { flags.StringVar(&o.IpFamily, "ip-family", string(v1alpha1.IPFamilyTypeIPV4), "Specify the IP protocol version used by network devices, common IP families include IPv4 and IPv6.") flags.StringVar(&o.UseProxy, "use-proxy", "false", "Set whether to enable proxy.") flags.IntVarP(&o.WaitTime, "wait-time", "", utils.DefaultWaitTime, "Wait the specified time for the Kosmos install ready.") + flags.StringToStringVar(&o.NodeElasticIP, "node-elasticip", nil, "Set cluster node with elastic ip.") return cmd } @@ -143,8 +146,18 @@ func (o *CommandInstallOptions) Complete(f ctlutil.Factory) error { } func (o *CommandInstallOptions) Validate() error { + validationErr := "kosmosctl install validate error" if len(o.Namespace) == 0 { - return fmt.Errorf("kosmosctl install validate error, namespace is not valid") + return fmt.Errorf("%s, namespace is not valid", validationErr) + } + for nodeName, elasticIP := range o.NodeElasticIP { + _, err := o.K8sClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("%s, node %s is invalid: %v", validationErr, nodeName, err) + } + if net.ParseIP(elasticIP) == nil { + return fmt.Errorf("%s, ElasticIP %s is invalid", validationErr, elasticIP) + } } return nil @@ -579,6 +592,7 @@ func (o *CommandInstallOptions) createControlCluster() error { NetworkType: o.NetworkType, IpFamily: o.IpFamily, UseProxy: o.UseProxy, + NodeElasticIP: o.NodeElasticIP, } err = joinOptions.Run(clusterArgs) @@ -608,6 +622,7 @@ func (o *CommandInstallOptions) createControlCluster() error { case utils.DefaultIPv6: controlCluster.Spec.ClusterLinkOptions.IPFamily = v1alpha1.IPFamilyTypeIPV6 } + controlCluster.Spec.ClusterLinkOptions.NodeElasticIPMap = o.NodeElasticIP _, err = o.KosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), controlCluster, metav1.UpdateOptions{}) if err != nil { klog.Infof("ControlCluster-Link: ", controlCluster) @@ -671,6 +686,7 @@ func (o *CommandInstallOptions) createControlCluster() error { NetworkType: o.NetworkType, IpFamily: o.IpFamily, UseProxy: o.UseProxy, + NodeElasticIP: o.NodeElasticIP, EnableTree: true, } @@ -702,6 +718,7 @@ func (o *CommandInstallOptions) createControlCluster() error { case utils.DefaultIPv6: controlCluster.Spec.ClusterLinkOptions.IPFamily = v1alpha1.IPFamilyTypeIPV6 } + controlCluster.Spec.ClusterLinkOptions.NodeElasticIPMap = o.NodeElasticIP _, err = o.KosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), controlCluster, metav1.UpdateOptions{}) if err != nil { klog.Infof("ControlCluster-All: ", controlCluster) diff --git a/pkg/kosmosctl/join/join.go b/pkg/kosmosctl/join/join.go index a307adc6f..26ad33ff0 100644 --- a/pkg/kosmosctl/join/join.go +++ b/pkg/kosmosctl/join/join.go @@ -3,6 +3,7 @@ package join import ( "context" "fmt" + "net" "os" "path/filepath" @@ -58,6 +59,7 @@ type CommandJoinOptions struct { NetworkType string IpFamily string UseProxy string + NodeElasticIP map[string]string EnableTree bool @@ -100,6 +102,7 @@ func NewCmdJoin(f ctlutil.Factory) *cobra.Command { flags.StringVar(&o.UseProxy, "use-proxy", "false", "Set whether to enable proxy.") flags.BoolVar(&o.EnableTree, "enable-tree", false, "Turn on clustertree.") flags.IntVarP(&o.WaitTime, "wait-time", "", utils.DefaultWaitTime, "Wait the specified time for the Kosmos install ready.") + flags.StringToStringVar(&o.NodeElasticIP, "node-elasticip", nil, "Set cluster node with elastic ip.") return cmd } @@ -165,6 +168,17 @@ func (o *CommandJoinOptions) Validate(args []string) error { return fmt.Errorf("kosmosctl join validate error, namespace is not valid") } + validationErr := "kosmosctl join validate error" + for nodeName, elasticIP := range o.NodeElasticIP { + _, err := o.K8sClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("%s, node %s is invalid: %v", validationErr, nodeName, err) + } + if net.ParseIP(elasticIP) == nil { + return fmt.Errorf("%s, ElasticIP %s is invalid", validationErr, elasticIP) + } + } + switch args[0] { case "cluster": _, err := o.KosmosClient.KosmosV1alpha1().Clusters().Get(context.TODO(), o.Name, metav1.GetOptions{}) @@ -216,8 +230,9 @@ func (o *CommandJoinOptions) runCluster() error { IP: "210.0.0.0/8", IP6: "9470::0/16", }, - NetworkType: v1alpha1.NetWorkTypeGateWay, - IPFamily: v1alpha1.IPFamilyTypeIPV4, + NetworkType: v1alpha1.NetWorkTypeGateWay, + IPFamily: v1alpha1.IPFamilyTypeIPV4, + NodeElasticIPMap: o.NodeElasticIP, }, ClusterTreeOptions: v1alpha1.ClusterTreeOptions{ Enable: o.EnableTree, @@ -244,6 +259,7 @@ func (o *CommandJoinOptions) runCluster() error { cluster.Spec.ClusterLinkOptions.DefaultNICName = o.DefaultNICName cluster.Spec.ClusterLinkOptions.CNI = o.CNI + cluster.Spec.ClusterLinkOptions.NodeElasticIPMap = o.NodeElasticIP } // ToDo ClusterTree currently has no init parameters, can be expanded later. diff --git a/pkg/kosmosctl/manifest/manifest_crds.go b/pkg/kosmosctl/manifest/manifest_crds.go index 20314e586..e1497a602 100644 --- a/pkg/kosmosctl/manifest/manifest_crds.go +++ b/pkg/kosmosctl/manifest/manifest_crds.go @@ -347,6 +347,8 @@ spec: properties: clusterName: type: string + elasticip: + type: string interfaceName: type: string ip: @@ -365,16 +367,17 @@ spec: type: array type: object status: + properties: + nodeStatus: + type: string type: object required: - spec type: object served: true storage: true - subresources: - status: {} + subresources: {} ` - const Cluster = `--- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition @@ -482,6 +485,12 @@ spec: - nodeName type: object type: array + nodeElasticIPMap: + additionalProperties: + type: string + description: NodeElasticIPMap presents mapping between nodename + in kubernetes and elasticIP + type: object useIPPool: default: false type: boolean @@ -491,60 +500,11 @@ spec: enable: default: true type: boolean - leafModel: - description: LeafModel provide an api to arrange the member cluster + leafModels: + description: LeafModels provide an api to arrange the member cluster with some rules to pretend one or more leaf node items: properties: - labelSelector: - description: LabelSelector is a filter to select member - cluster nodes to pretend a leaf node in clusterTree by - labels. If nil or empty, the hole member cluster nodes - will pretend one leaf node. - properties: - matchExpressions: - description: matchExpressions is a list of label selector - requirements. The requirements are ANDed. - items: - description: A label selector requirement is a selector - that contains values, a key, and an operator that - relates the key and values. - properties: - key: - description: key is the label key that the selector - applies to. - type: string - operator: - description: operator represents a key's relationship - to a set of values. Valid operators are In, - NotIn, Exists and DoesNotExist. - type: string - values: - description: values is an array of string values. - If the operator is In or NotIn, the values array - must be non-empty. If the operator is Exists - or DoesNotExist, the values array must be empty. - This array is replaced during a strategic merge - patch. - items: - type: string - type: array - required: - - key - - operator - type: object - type: array - matchLabels: - additionalProperties: - type: string - description: matchLabels is a map of {key,value} pairs. - A single {key,value} in the matchLabels map is equivalent - to an element of matchExpressions, whose key field - is "key", the operator is "In", and the values array - contains only "value". The requirements are ANDed. - type: object - type: object - x-kubernetes-map-type: atomic labels: additionalProperties: type: string @@ -556,6 +516,65 @@ spec: the leaf node name will generate by controller and fill in cluster link status type: string + nodeSelector: + description: NodeSelector is a selector to select member + cluster nodes to pretend a leaf node in clusterTree. + properties: + labelSelector: + description: LabelSelector is a filter to select member + cluster nodes to pretend a leaf node in clusterTree + by labels. It will work on second level schedule on + pod create in member clusters. + properties: + matchExpressions: + description: matchExpressions is a list of label + selector requirements. The requirements are ANDed. + items: + description: A label selector requirement is a + selector that contains values, a key, and an + operator that relates the key and values. + properties: + key: + description: key is the label key that the + selector applies to. + type: string + operator: + description: operator represents a key's relationship + to a set of values. Valid operators are + In, NotIn, Exists and DoesNotExist. + type: string + values: + description: values is an array of string + values. If the operator is In or NotIn, + the values array must be non-empty. If the + operator is Exists or DoesNotExist, the + values array must be empty. This array is + replaced during a strategic merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} + pairs. A single {key,value} in the matchLabels + map is equivalent to an element of matchExpressions, + whose key field is "key", the operator is "In", + and the values array contains only "value". The + requirements are ANDed. + type: object + type: object + x-kubernetes-map-type: atomic + nodeName: + description: NodeName is Member cluster origin node + Name + type: string + type: object taints: description: Taints attached to the leaf pretended Node. If nil or empty, controller will set the default no-schedule @@ -642,7 +661,6 @@ spec: storage: true subresources: {} ` - const NodeConfig = `--- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition @@ -767,6 +785,48 @@ spec: - gw type: object type: array + xfrmpolicies: + items: + properties: + dir: + type: integer + leftip: + type: string + leftnet: + type: string + reqid: + type: integer + rightip: + type: string + rightnet: + type: string + required: + - dir + - leftip + - leftnet + - reqid + - rightip + - rightnet + type: object + type: array + xfrmstates: + items: + properties: + PSK: + type: string + leftip: + type: string + reqid: + type: integer + rightip: + type: string + required: + - PSK + - leftip + - reqid + - rightip + type: object + type: array type: object status: properties: @@ -785,7 +845,6 @@ spec: subresources: status: {} ` - const DaemonSet = `--- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition