diff --git a/charts/kube-ovn/templates/kube-ovn-crd.yaml b/charts/kube-ovn/templates/kube-ovn-crd.yaml index 565f54ebda6..37ad6acfbfd 100644 --- a/charts/kube-ovn/templates/kube-ovn-crd.yaml +++ b/charts/kube-ovn/templates/kube-ovn-crd.yaml @@ -503,6 +503,31 @@ spec: type: string qosPolicy: type: string + bgpSpeaker: + type: object + properties: + enabled: + type: boolean + asn: + type: integer + remoteAsn: + type: integer + neighbors: + type: array + items: + type: string + holdTime: + type: string + routerId: + type: string + password: + type: string + enableGracefulRestart: + type: boolean + extraArgs: + type: array + items: + type: string tolerations: type: array items: diff --git a/dist/images/install.sh b/dist/images/install.sh index 6dc342a0d1a..70055457461 100755 --- a/dist/images/install.sh +++ b/dist/images/install.sh @@ -741,6 +741,31 @@ spec: type: string qosPolicy: type: string + bgpSpeaker: + type: object + properties: + enabled: + type: boolean + asn: + type: integer + remoteAsn: + type: integer + neighbors: + type: array + items: + type: string + holdTime: + type: string + routerId: + type: string + password: + type: string + enableGracefulRestart: + type: boolean + extraArgs: + type: array + items: + type: string tolerations: type: array items: diff --git a/pkg/apis/kubeovn/v1/types.go b/pkg/apis/kubeovn/v1/types.go index 5f2c25d8146..a2e951926e3 100644 --- a/pkg/apis/kubeovn/v1/types.go +++ b/pkg/apis/kubeovn/v1/types.go @@ -524,6 +524,19 @@ type VpcNatSpec struct { Tolerations []corev1.Toleration `json:"tolerations"` Affinity corev1.Affinity `json:"affinity"` QoSPolicy string `json:"qosPolicy"` + BgpSpeaker VpcBgpSpeaker `json:"bgpSpeaker"` +} + +type VpcBgpSpeaker struct { + Enabled bool `json:"enabled"` + ASN uint32 `json:"asn"` + RemoteASN uint32 `json:"remoteAsn"` + Neighbors []string `json:"neighbors"` + HoldTime metav1.Duration `json:"holdTime"` + RouterID string `json:"routerId"` + Password string `json:"password"` + EnableGracefulRestart bool `json:"enableGracefulRestart"` + ExtraArgs []string `json:"extraArgs"` } type VpcNatStatus struct { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index ab81820b617..a202df48c96 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -1112,7 +1112,7 @@ func (c *Controller) startWorkers(ctx context.Context) { }, time.Second, ctx.Done()) go wait.Until(func() { - c.resyncVpcNatImage() + c.resyncVpcNatConfig() }, time.Second, ctx.Done()) go wait.Until(func() { diff --git a/pkg/controller/vpc_nat.go b/pkg/controller/vpc_nat.go index 57b0eee160d..2cd93f698e0 100644 --- a/pkg/controller/vpc_nat.go +++ b/pkg/controller/vpc_nat.go @@ -8,15 +8,22 @@ import ( "github.com/kubeovn/kube-ovn/pkg/util" ) -var vpcNatImage = "" +var ( + vpcNatImage = "" + vpcNatGwBgpSpeakerImage = "" + vpcNatAPINadName = "" + vpcNatAPINadProvider = "" +) -func (c *Controller) resyncVpcNatImage() { +func (c *Controller) resyncVpcNatConfig() { cm, err := c.configMapsLister.ConfigMaps(c.config.PodNamespace).Get(util.VpcNatConfig) if err != nil { err = fmt.Errorf("failed to get ovn-vpc-nat-config, %w", err) klog.Error(err) return } + + // Image we're using to provision the NAT gateways image, exist := cm.Data["image"] if !exist { err = fmt.Errorf("%s should have image field", util.VpcNatConfig) @@ -24,4 +31,13 @@ func (c *Controller) resyncVpcNatImage() { return } vpcNatImage = image + + // Image for the BGP sidecar of the gateway (optional) + vpcNatGwBgpSpeakerImage = cm.Data["bgpSpeakerImage"] + + // NetworkAttachmentDefinition name for the BGP speaker to call the API server + vpcNatAPINadName = cm.Data["apiNadName"] + + // NetworkAttachmentDefinition provider for the BGP speaker to call the API server + vpcNatAPINadProvider = cm.Data["apiNadProvider"] } diff --git a/pkg/controller/vpc_nat_gateway.go b/pkg/controller/vpc_nat_gateway.go index fb13f8f3f61..6e772485fd5 100644 --- a/pkg/controller/vpc_nat_gateway.go +++ b/pkg/controller/vpc_nat_gateway.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "maps" + "os" "reflect" "regexp" "slices" @@ -735,6 +736,52 @@ func (c *Controller) execNatGwRules(pod *corev1.Pod, operation string, rules []s return nil } +func (c *Controller) setNatGwInterface(annotations map[string]string, externalNetwork string, defaultSubnet *kubeovnv1.Subnet) error { + if vpcNatAPINadName == "" { + return errors.New("no NetworkAttachmentDefinition provided to access apiserver, check configmap ovn-vpc-nat-config and field 'apiNadName'") + } + + nad := fmt.Sprintf("%s/%s, %s/%s", c.config.PodNamespace, externalNetwork, corev1.NamespaceDefault, vpcNatAPINadName) + annotations[util.AttachmentNetworkAnnotation] = nad + + return setNatGwRoute(annotations, defaultSubnet.Spec.Gateway) +} + +func setNatGwRoute(annotations map[string]string, subnetGw string) error { + dst := os.Getenv("KUBERNETES_SERVICE_HOST") + + protocol := util.CheckProtocol(dst) + if !strings.ContainsRune(dst, '/') { + switch protocol { + case kubeovnv1.ProtocolIPv4: + dst = fmt.Sprintf("%s/32", dst) + case kubeovnv1.ProtocolIPv6: + dst = fmt.Sprintf("%s/128", dst) + } + } + + // Check the API NetworkAttachmentDefinition exists, otherwise we won't be able to attach + // the BGP speaker to a network that has access to the K8S apiserver (and won't be able to detect EIPs) + if vpcNatAPINadProvider == "" { + return errors.New("no NetworkAttachmentDefinition provided to access apiserver, check configmap ovn-vpc-nat-config and field 'apiNadName'") + } + + for _, gw := range strings.Split(subnetGw, ",") { + if util.CheckProtocol(gw) == protocol { + routes := []request.Route{{Destination: dst, Gateway: gw}} + buf, err := json.Marshal(routes) + if err != nil { + return fmt.Errorf("failed to marshal routes %+v: %w", routes, err) + } + + annotations[fmt.Sprintf(util.RoutesAnnotationTemplate, vpcNatAPINadProvider)] = string(buf) + break + } + } + + return nil +} + func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1.StatefulSet) (*v1.StatefulSet, error) { annotations := make(map[string]string, 7) if oldSts != nil && len(oldSts.Annotations) != 0 { @@ -747,6 +794,18 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1 util.LogicalSwitchAnnotation: gw.Spec.Subnet, util.IPAddressAnnotation: gw.Spec.LanIP, } + + if gw.Spec.BgpSpeaker.Enabled { // Add an interface that can reach the API server + defaultSubnet, err := c.subnetsLister.Get(c.config.DefaultLogicalSwitch) + if err != nil { + return nil, fmt.Errorf("failed to get default subnet %s: %w", c.config.DefaultLogicalSwitch, err) + } + + if err := c.setNatGwInterface(podAnnotations, nadName, defaultSubnet); err != nil { + return nil, err + } + } + for key, value := range podAnnotations { annotations[key] = value } @@ -782,6 +841,7 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1 routes = append(routes, request.Route{Destination: cidrV6, Gateway: v6Gateway}) } } + if err = setPodRoutesAnnotation(annotations, util.OvnProvider, routes); err != nil { klog.Error(err) return nil, err @@ -820,6 +880,7 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1 "app": name, util.VpcNatGatewayLabel: "true", } + sts := &v1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -859,6 +920,106 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1 }, }, } + + // BGP speaker for GWs must be enabled globally and for this specific instance + if gw.Spec.BgpSpeaker.Enabled { + containers := sts.Spec.Template.Spec.Containers + + // We need a speaker image configured in the NAT GW ConfigMap + if vpcNatGwBgpSpeakerImage == "" { + return nil, fmt.Errorf("%s should have bgp speaker image field if bgp enabled", util.VpcNatConfig) + } + + args := []string{ + "--nat-gw-mode", // Force to run in NAT GW mode, we're not announcing Pod IPs or Services, only EIPs + } + + speakerParams := gw.Spec.BgpSpeaker + + if speakerParams.RouterID != "" { // Override default auto-selected RouterID + args = append(args, fmt.Sprintf("--router-id=%s", speakerParams.RouterID)) + } + + if speakerParams.Password != "" { // Password for TCP MD5 BGP + args = append(args, fmt.Sprintf("--auth-password=%s", speakerParams.Password)) + } + + if speakerParams.EnableGracefulRestart { // Enable graceful restart + args = append(args, "--graceful-restart") + } + + if speakerParams.HoldTime != (metav1.Duration{}) { // Hold time + args = append(args, fmt.Sprintf("--holdtime=%s", speakerParams.HoldTime.Duration.String())) + } + + if speakerParams.ASN == 0 { // The ASN we use to speak + return nil, errors.New("ASN not set, but must be non-zero value") + } + + if speakerParams.RemoteASN == 0 { // The ASN we speak to + return nil, errors.New("remote ASN not set, but must be non-zero value") + } + + args = append(args, fmt.Sprintf("--cluster-as=%d", speakerParams.ASN)) + args = append(args, fmt.Sprintf("--neighbor-as=%d", speakerParams.RemoteASN)) + + if len(speakerParams.Neighbors) == 0 { + return nil, errors.New("no BGP neighbors specified") + } + + var neighIPv4 []string + var neighIPv6 []string + for _, neighbor := range speakerParams.Neighbors { + switch util.CheckProtocol(neighbor) { + case kubeovnv1.ProtocolIPv4: + neighIPv4 = append(neighIPv4, neighbor) + case kubeovnv1.ProtocolIPv6: + neighIPv6 = append(neighIPv6, neighbor) + } + } + + argNeighIPv4 := strings.Join(neighIPv4, ",") + argNeighIPv6 := strings.Join(neighIPv6, ",") + argNeighIPv4 = fmt.Sprintf("--neighbor-address=%s", argNeighIPv4) + argNeighIPv6 = fmt.Sprintf("--neighbor-ipv6-address=%s", argNeighIPv6) + + if len(neighIPv4) > 0 { + args = append(args, argNeighIPv4) + } + + if len(neighIPv6) > 0 { + args = append(args, argNeighIPv6) + } + + // Extra args to start the speaker with, for example, logging levels... + args = append(args, speakerParams.ExtraArgs...) + + sts.Spec.Template.Spec.ServiceAccountName = "vpc-nat-gw" + speakerContainer := corev1.Container{ + Name: "vpc-nat-gw-speaker", + Image: vpcNatGwBgpSpeakerImage, + Command: []string{"/kube-ovn/kube-ovn-speaker"}, + ImagePullPolicy: corev1.PullIfNotPresent, + Env: []corev1.EnvVar{ + { + Name: util.GatewayNameEnv, + Value: gw.Name, + }, + { + Name: "POD_IP", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "status.podIP", + }, + }, + }, + }, + Args: args, + } + + sts.Spec.Template.Spec.Containers = append(containers, speakerContainer) + } + return sts, nil } diff --git a/pkg/speaker/bgp.go b/pkg/speaker/bgp.go new file mode 100644 index 00000000000..3ec6008bd37 --- /dev/null +++ b/pkg/speaker/bgp.go @@ -0,0 +1,240 @@ +package speaker + +import ( + "context" + "fmt" + "net" + + bgpapi "github.com/osrg/gobgp/v3/api" + bgpapiutil "github.com/osrg/gobgp/v3/pkg/apiutil" + "github.com/osrg/gobgp/v3/pkg/packet/bgp" + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" + "google.golang.org/protobuf/types/known/anypb" + "k8s.io/klog/v2" + + kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + "github.com/kubeovn/kube-ovn/pkg/util" +) + +var maskMap = map[string]int{kubeovnv1.ProtocolIPv4: 32, kubeovnv1.ProtocolIPv6: 128} + +// reconcileRoutes configures the BGP speaker to announce only the routes we are expected to announce +// and to withdraw the ones that should not be announced anymore +func (c *Controller) reconcileRoutes(expectedPrefixes prefixMap) error { + if len(c.config.NeighborAddresses) != 0 { + err := c.reconcileIPFamily(kubeovnv1.ProtocolIPv4, expectedPrefixes) + if err != nil { + return fmt.Errorf("failed to reconcile IPv4 routes: %w", err) + } + } + + if len(c.config.NeighborIPv6Addresses) != 0 { + err := c.reconcileIPFamily(kubeovnv1.ProtocolIPv6, expectedPrefixes) + if err != nil { + return fmt.Errorf("failed to reconcile IPv6 routes: %w", err) + } + } + + return nil +} + +// reconcileIPFamily announces prefixes we are not currently announcing and withdraws prefixes we should +// not be announcing for a given IP family (IPv4/IPv6) +func (c *Controller) reconcileIPFamily(ipFamily string, expectedPrefixes prefixMap) error { + // Get the address family associated with the Kube-OVN family + afi, err := kubeOvnFamilyToAFI(ipFamily) + if err != nil { + return fmt.Errorf("couldn't convert family to afi: %w", err) + } + + // Craft a BGP path listing request for this AFI + listPathRequest := &bgpapi.ListPathRequest{ + TableType: bgpapi.TableType_GLOBAL, + Family: &bgpapi.Family{Afi: afi, Safi: bgpapi.Family_SAFI_UNICAST}, + } + + // Anonymous function that stores the prefixes we are announcing for this AFI + var existingPrefixes []string + fn := func(d *bgpapi.Destination) { + for _, path := range d.Paths { + attrInterfaces, _ := bgpapiutil.UnmarshalPathAttributes(path.Pattrs) + nextHop := getNextHopFromPathAttributes(attrInterfaces) + klog.V(5).Infof("announcing route with prefix %s and nexthop: %s", d.Prefix, nextHop.String()) + + route, _ := netlink.RouteGet(nextHop) + if len(route) == 1 && route[0].Type == unix.RTN_LOCAL || nextHop.String() == c.config.RouterID { + existingPrefixes = append(existingPrefixes, d.Prefix) + return + } + } + } + + // Ask the BGP speaker what route we're announcing for the IP family selected + if err := c.config.BgpServer.ListPath(context.Background(), listPathRequest, fn); err != nil { + return fmt.Errorf("failed to list existing %s routes: %w", ipFamily, err) + } + + klog.V(5).Infof("currently announcing %s routes: %v", ipFamily, existingPrefixes) + + // Announce routes we should be announcing and withdraw the ones that are no longer valid + c.announceAndWithdraw(routeDiff(expectedPrefixes[ipFamily], existingPrefixes)) + return nil +} + +// announceAndWithdraw commands the BGP speaker to start announcing new routes and to withdraw others +func (c *Controller) announceAndWithdraw(toAdd, toDel []string) { + // Announce routes that need to be added + klog.V(5).Infof("new routes we will announce: %v", toAdd) + for _, route := range toAdd { + if err := c.addRoute(route); err != nil { + klog.Error(err) + } + } + + // Withdraw routes that should be deleted + klog.V(5).Infof("announced routes we will withdraw: %v", toDel) + for _, route := range toDel { + if err := c.delRoute(route); err != nil { + klog.Error(err) + } + } +} + +// addRoute adds a new route to advertise from our BGP speaker +func (c *Controller) addRoute(route string) error { + // Determine the Address Family Indicator (IPv6/IPv4) + routeAfi := bgpapi.Family_AFI_IP + if util.CheckProtocol(route) == kubeovnv1.ProtocolIPv6 { + routeAfi = bgpapi.Family_AFI_IP6 + } + + // Get NLRI and attributes to announce all the next hops possible + nlri, attrs, err := c.getNlriAndAttrs(route) + if err != nil { + return fmt.Errorf("failed to get NLRI and attributes: %w", err) + } + + // Announce every next hop we have + for _, attr := range attrs { + _, err = c.config.BgpServer.AddPath(context.Background(), &bgpapi.AddPathRequest{ + Path: &bgpapi.Path{ + Family: &bgpapi.Family{Afi: routeAfi, Safi: bgpapi.Family_SAFI_UNICAST}, + Nlri: nlri, + Pattrs: attr, + }, + }) + if err != nil { + return err + } + } + + return nil +} + +// delRoute removes a route we are currently advertising from our BGP speaker +func (c *Controller) delRoute(route string) error { + // Determine the Address Family Indicator (IPv6/IPv4) + routeAfi := bgpapi.Family_AFI_IP + if util.CheckProtocol(route) == kubeovnv1.ProtocolIPv6 { + routeAfi = bgpapi.Family_AFI_IP6 + } + + // Get NLRI and attributes to announce all the next hops possible + nlri, attrs, err := c.getNlriAndAttrs(route) + if err != nil { + return fmt.Errorf("failed to get NLRI and attributes: %w", err) + } + + // Withdraw every next hop we have + for _, attr := range attrs { + err = c.config.BgpServer.DeletePath(context.Background(), &bgpapi.DeletePathRequest{ + Path: &bgpapi.Path{ + Family: &bgpapi.Family{Afi: routeAfi, Safi: bgpapi.Family_SAFI_UNICAST}, + Nlri: nlri, + Pattrs: attr, + }, + }) + if err != nil { + return err + } + } + return nil +} + +// getNlriAndAttrs returns the Network Layer Reachability Information (NLRI) and the BGP attributes for a particular route +func (c *Controller) getNlriAndAttrs(route string) (*anypb.Any, [][]*anypb.Any, error) { + // Should this route be advertised to IPv4 or IPv6 peers + // GoBGP supports extended-nexthop, we should be able to advertise IPv4 NLRI to IPv6 peer and the opposite to + // Is this check really necessary? + neighborAddresses := c.config.NeighborAddresses + if util.CheckProtocol(route) == kubeovnv1.ProtocolIPv6 { + neighborAddresses = c.config.NeighborIPv6Addresses + } + + // Get the route we're about to advertise and transform it to an NLRI + prefix, prefixLen, err := parseRoute(route) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse route: %w", err) + } + + // Marshal the NLRI + nlri, err := anypb.New(&bgpapi.IPAddressPrefix{ + Prefix: prefix, + PrefixLen: prefixLen, + }) + if err != nil { + return nil, nil, fmt.Errorf("failed to marshal NLRI: %w", err) + } + + // Create attributes for each neighbor to advertise the correct next hop + attrs := make([][]*anypb.Any, 0, len(neighborAddresses)) + for _, addr := range neighborAddresses { + a1, _ := anypb.New(&bgpapi.OriginAttribute{ + Origin: 0, + }) + a2, _ := anypb.New(&bgpapi.NextHopAttribute{ + NextHop: c.getNextHopAttribute(addr), + }) + attrs = append(attrs, []*anypb.Any{a1, a2}) + } + + return nlri, attrs, err +} + +// getNextHopAttribute returns the next hop we should advertise for a specific BGP neighbor +func (c *Controller) getNextHopAttribute(neighborAddress string) string { + nextHop := c.config.RouterID // If no route is found, fallback to router ID + + // Retrieve the route we use to speak to this neighbor and consider the source as next hop + routes, err := netlink.RouteGet(net.ParseIP(neighborAddress)) + if err == nil && len(routes) == 1 && routes[0].Src != nil { + nextHop = routes[0].Src.String() + } + + proto := util.CheckProtocol(nextHop) // Is next hop IPv4 or IPv6 + nextHopIP := net.ParseIP(nextHop) // Convert next hop to an IP + + // This takes care of a special case where the speaker might not be running in host mode + // If this happens, the nextHopIP will be the IP of a Pod (probably unreachable for the neighbours) + // For this case, the configuration allows for manually specifying the IPs to use as next hop (per protocol) + nodeIP := c.config.NodeIPs[proto] + if nodeIP != nil && nextHopIP != nil && nextHopIP.Equal(c.config.PodIPs[proto]) { + nextHop = nodeIP.String() + } + + return nextHop +} + +// getNextHopFromPathAttributes returns the next hop from BGP path attributes +func getNextHopFromPathAttributes(attrs []bgp.PathAttributeInterface) net.IP { + for _, attr := range attrs { + switch a := attr.(type) { + case *bgp.PathAttributeNextHop: + return a.Value + case *bgp.PathAttributeMpReachNLRI: + return a.Nexthop + } + } + return nil +} diff --git a/pkg/speaker/config.go b/pkg/speaker/config.go index 42df76fe0a8..49d2f69c58f 100644 --- a/pkg/speaker/config.go +++ b/pkg/speaker/config.go @@ -55,6 +55,7 @@ type Configuration struct { GracefulRestartTime time.Duration PassiveMode bool EbgpMultihopTTL uint8 + NatGwMode bool NodeName string KubeConfigFile string @@ -68,11 +69,11 @@ func ParseFlags() (*Configuration, error) { var ( argDefaultGracefulTime = pflag.Duration("graceful-restart-time", DefaultGracefulRestartTime, "BGP Graceful restart time according to RFC4724 3, maximum 4095s.") argGracefulRestartDeferralTime = pflag.Duration("graceful-restart-deferral-time", DefaultGracefulRestartDeferralTime, "BGP Graceful restart deferral time according to RFC4724 4.1, maximum 18h.") - argGracefulRestart = pflag.BoolP("graceful-restart", "", false, "Enables the BGP Graceful Restart so that routes are preserved on unexpected restarts") - argAnnounceClusterIP = pflag.BoolP("announce-cluster-ip", "", false, "The Cluster IP of the service to announce to the BGP peers.") + argGracefulRestart = pflag.BoolP("graceful-restart", "", false, "Enables the BGP Graceful Restart so that routes are preserved on unexpected restarts") + argAnnounceClusterIP = pflag.BoolP("announce-cluster-ip", "", false, "The Cluster IP of the service to announce to the BGP peers.") argGrpcHost = pflag.String("grpc-host", "127.0.0.1", "The host address for grpc to listen, default: 127.0.0.1") argGrpcPort = pflag.Uint32("grpc-port", DefaultBGPGrpcPort, "The port for grpc to listen, default:50051") - argClusterAs = pflag.Uint32("cluster-as", DefaultBGPClusterAs, "The as number of container network, default 65000") + argClusterAs = pflag.Uint32("cluster-as", DefaultBGPClusterAs, "The AS number of container network, default 65000") argRouterID = pflag.String("router-id", "", "The address for the speaker to use as router id, default the node ip") argNodeIPs = pflag.String("node-ips", "", "The comma-separated list of node IP addresses to use instead of the pod IP address for the next hop router IP address.") argNeighborAddress = pflag.String("neighbor-address", "", "Comma separated IPv4 router addresses the speaker connects to.") @@ -83,8 +84,9 @@ func ParseFlags() (*Configuration, error) { argPprofPort = pflag.Uint32("pprof-port", DefaultPprofPort, "The port to get profiling data, default: 10667") argNodeName = pflag.String("node-name", os.Getenv(util.HostnameEnv), "Name of the node on which the speaker is running on.") argKubeConfigFile = pflag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information. If not set use the inCluster token.") - argPassiveMode = pflag.BoolP("passivemode", "", false, "Set BGP Speaker to passive model,do not actively initiate connections to peers") + argPassiveMode = pflag.BoolP("passivemode", "", false, "Set BGP Speaker to passive model, do not actively initiate connections to peers") argEbgpMultihopTTL = pflag.Uint8("ebgp-multihop", DefaultEbgpMultiHop, "The TTL value of EBGP peer, default: 1") + argNatGwMode = pflag.BoolP("nat-gw-mode", "", false, "Make the BGP speaker announce EIPs from inside a NAT gateway, Pod IP/Service/Subnet announcements will be disabled") ) klogFlags := flag.NewFlagSet("klog", flag.ExitOnError) klog.InitFlags(klogFlags) @@ -148,6 +150,7 @@ func ParseFlags() (*Configuration, error) { GracefulRestartTime: *argDefaultGracefulTime, PassiveMode: *argPassiveMode, EbgpMultihopTTL: *argEbgpMultihopTTL, + NatGwMode: *argNatGwMode, } if *argNeighborAddress != "" { diff --git a/pkg/speaker/controller.go b/pkg/speaker/controller.go index 1cc9030c8e4..1fcbda7e66e 100644 --- a/pkg/speaker/controller.go +++ b/pkg/speaker/controller.go @@ -26,13 +26,21 @@ const controllerAgentName = "ovn-speaker" type Controller struct { config *Configuration - podsLister listerv1.PodLister - podsSynced cache.InformerSynced - subnetsLister kubeovnlister.SubnetLister - subnetSynced cache.InformerSynced + podsLister listerv1.PodLister + podsSynced cache.InformerSynced + + subnetsLister kubeovnlister.SubnetLister + subnetSynced cache.InformerSynced + servicesLister listerv1.ServiceLister servicesSynced cache.InformerSynced + eipLister kubeovnlister.IptablesEIPLister + eipSynced cache.InformerSynced + + natgatewayLister kubeovnlister.VpcNatGatewayLister + natgatewaySynced cache.InformerSynced + informerFactory kubeinformers.SharedInformerFactory kubeovnInformerFactory kubeovninformer.SharedInformerFactory recorder record.EventRecorder @@ -58,16 +66,22 @@ func NewController(config *Configuration) *Controller { podInformer := informerFactory.Core().V1().Pods() subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets() serviceInformer := informerFactory.Core().V1().Services() + eipInformer := kubeovnInformerFactory.Kubeovn().V1().IptablesEIPs() + natgatewayInformer := kubeovnInformerFactory.Kubeovn().V1().VpcNatGateways() controller := &Controller{ config: config, - podsLister: podInformer.Lister(), - podsSynced: podInformer.Informer().HasSynced, - subnetsLister: subnetInformer.Lister(), - subnetSynced: subnetInformer.Informer().HasSynced, - servicesLister: serviceInformer.Lister(), - servicesSynced: serviceInformer.Informer().HasSynced, + podsLister: podInformer.Lister(), + podsSynced: podInformer.Informer().HasSynced, + subnetsLister: subnetInformer.Lister(), + subnetSynced: subnetInformer.Informer().HasSynced, + servicesLister: serviceInformer.Lister(), + servicesSynced: serviceInformer.Informer().HasSynced, + eipLister: eipInformer.Lister(), + eipSynced: eipInformer.Informer().HasSynced, + natgatewayLister: natgatewayInformer.Lister(), + natgatewaySynced: natgatewayInformer.Informer().HasSynced, informerFactory: informerFactory, kubeovnInformerFactory: kubeovnInformerFactory, @@ -82,14 +96,25 @@ func (c *Controller) Run(stopCh <-chan struct{}) { c.informerFactory.Start(stopCh) c.kubeovnInformerFactory.Start(stopCh) - if !cache.WaitForCacheSync(stopCh, c.podsSynced, c.subnetSynced, c.servicesSynced) { + if !cache.WaitForCacheSync(stopCh, c.podsSynced, c.subnetSynced, c.servicesSynced, c.eipSynced) { util.LogFatalAndExit(nil, "failed to wait for caches to sync") return } klog.Info("Started workers") - go wait.Until(c.syncSubnetRoutes, 5*time.Second, stopCh) + go wait.Until(c.Reconcile, 5*time.Second, stopCh) <-stopCh klog.Info("Shutting down workers") } + +func (c *Controller) Reconcile() { + if c.config.NatGwMode { + err := c.syncEIPRoutes() + if err != nil { + klog.Errorf("failed to reconcile EIPs: %s", err.Error()) + } + } else { + c.syncSubnetRoutes() + } +} diff --git a/pkg/speaker/eip.go b/pkg/speaker/eip.go new file mode 100644 index 00000000000..b525c542910 --- /dev/null +++ b/pkg/speaker/eip.go @@ -0,0 +1,56 @@ +package speaker + +import ( + "errors" + "fmt" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + + v1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + "github.com/kubeovn/kube-ovn/pkg/util" +) + +// syncEIPRoutes retrieves all the EIPs attached to our GWs and starts announcing their route +func (c *Controller) syncEIPRoutes() error { + // Retrieve the name of our gateway + gatewayName := getGatewayName() + if gatewayName == "" { + return errors.New("failed to retrieve the name of the gateway, might not be running in a gateway pod") + } + + // Create label requirements to only get EIPs attached to our NAT GW + requirements, err := labels.NewRequirement(util.VpcNatGatewayNameLabel, selection.Equals, []string{gatewayName}) + if err != nil { + return fmt.Errorf("failed to create label selector requirement: %w", err) + } + + // Filter all EIPs attached to our NAT GW + eips, err := c.eipLister.List(labels.NewSelector().Add(*requirements)) + if err != nil { + return fmt.Errorf("failed to list EIPs attached to our GW: %w", err) + } + + return c.announceEIPs(eips) +} + +// announceEIPs announce all the prefixes related to EIPs attached to a GW +func (c *Controller) announceEIPs(eips []*v1.IptablesEIP) error { + expectedPrefixes := make(prefixMap) + for _, eip := range eips { + // Only announce EIPs marked as "ready" and with the BGP annotation set to true + if eip.Annotations[util.BgpAnnotation] != "true" || !eip.Status.Ready { + continue + } + + if eip.Spec.V4ip != "" { // If we have an IPv4, add it to prefixes we should be announcing + addExpectedPrefix(eip.Spec.V4ip, expectedPrefixes) + } + + if eip.Spec.V4ip != "" { // If we have an IPv6, add it to prefixes we should be announcing + addExpectedPrefix(eip.Spec.V6ip, expectedPrefixes) + } + } + + return c.reconcileRoutes(expectedPrefixes) +} diff --git a/pkg/speaker/subnet.go b/pkg/speaker/subnet.go index 47e8178f405..100e9e418fb 100644 --- a/pkg/speaker/subnet.go +++ b/pkg/speaker/subnet.go @@ -2,56 +2,28 @@ package speaker import ( - "context" - "fmt" - "net" - "strconv" "strings" - bgpapi "github.com/osrg/gobgp/v3/api" - bgpapiutil "github.com/osrg/gobgp/v3/pkg/apiutil" - "github.com/osrg/gobgp/v3/pkg/packet/bgp" - "github.com/vishvananda/netlink" - "golang.org/x/sys/unix" - "google.golang.org/protobuf/types/known/anypb" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" - kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" "github.com/kubeovn/kube-ovn/pkg/util" ) const ( - // "cluster" is the default policy + // announcePolicyCluster makes the Pod IPs/Subnet CIDRs be announced from every speaker, whether there's Pods + // that have that specific IP or that are part of the Subnet CIDR on that node. In other words, traffic may enter from + // any node hosting a speaker, and then be internally routed in the cluster to the actual Pod. In this configuration + // extra hops might be used. This is the default policy to Pods and Subnets. announcePolicyCluster = "cluster" - announcePolicyLocal = "local" + // announcePolicyLocal makes the Pod IPs be announced only from speakers on nodes that are actively hosting + // them. In other words, traffic will only enter from nodes hosting Pods marked as needing BGP advertisement, + // or Pods with an IP belonging to a Subnet marked as needing BGP advertisement. This makes the network path shorter. + announcePolicyLocal = "local" ) -func isPodAlive(p *v1.Pod) bool { - if p.Status.Phase == v1.PodSucceeded && p.Spec.RestartPolicy != v1.RestartPolicyAlways { - return false - } - - if p.Status.Phase == v1.PodFailed && p.Spec.RestartPolicy == v1.RestartPolicyNever { - return false - } - - if p.Status.Phase == v1.PodFailed && p.Status.Reason == "Evicted" { - return false - } - return true -} - -func isClusterIPService(svc *v1.Service) bool { - return svc.Spec.Type == v1.ServiceTypeClusterIP && - svc.Spec.ClusterIP != v1.ClusterIPNone && - len(svc.Spec.ClusterIP) != 0 -} - func (c *Controller) syncSubnetRoutes() { - maskMap := map[string]int{kubeovnv1.ProtocolIPv4: 32, kubeovnv1.ProtocolIPv6: 128} - bgpExpected, bgpExists := make(map[string][]string), make(map[string][]string) + bgpExpected := make(prefixMap) subnets, err := c.subnetsLister.List(labels.Everything()) if err != nil { @@ -73,8 +45,7 @@ func (c *Controller) syncSubnetRoutes() { for _, svc := range services { if svc.Annotations != nil && svc.Annotations[util.BgpAnnotation] == "true" && isClusterIPService(svc) { for _, clusterIP := range svc.Spec.ClusterIPs { - ipFamily := util.CheckProtocol(clusterIP) - bgpExpected[ipFamily] = append(bgpExpected[ipFamily], fmt.Sprintf("%s/%d", clusterIP, maskMap[ipFamily])) + addExpectedPrefix(clusterIP, bgpExpected) } } } @@ -139,225 +110,12 @@ func (c *Controller) syncSubnetRoutes() { } } - for ipFamily, ip := range ips { - bgpExpected[ipFamily] = append(bgpExpected[ipFamily], fmt.Sprintf("%s/%d", ip, maskMap[ipFamily])) - } - } - - klog.V(5).Infof("expected announce ipv4 routes: %v, ipv6 routes: %v", bgpExpected[kubeovnv1.ProtocolIPv4], bgpExpected[kubeovnv1.ProtocolIPv6]) - - fn := func(d *bgpapi.Destination) { - for _, path := range d.Paths { - attrInterfaces, _ := bgpapiutil.UnmarshalPathAttributes(path.Pattrs) - nextHop := getNextHopFromPathAttributes(attrInterfaces) - klog.V(5).Infof("the route Prefix is %s, NextHop is %s", d.Prefix, nextHop.String()) - ipFamily := util.CheckProtocol(nextHop.String()) - route, _ := netlink.RouteGet(nextHop) - if len(route) == 1 && route[0].Type == unix.RTN_LOCAL || nextHop.String() == c.config.RouterID { - bgpExists[ipFamily] = append(bgpExists[ipFamily], d.Prefix) - return - } + for _, ip := range ips { + addExpectedPrefix(ip, bgpExpected) } } - if len(c.config.NeighborAddresses) != 0 { - listPathRequest := &bgpapi.ListPathRequest{ - TableType: bgpapi.TableType_GLOBAL, - Family: &bgpapi.Family{Afi: bgpapi.Family_AFI_IP, Safi: bgpapi.Family_SAFI_UNICAST}, - } - if err := c.config.BgpServer.ListPath(context.Background(), listPathRequest, fn); err != nil { - klog.Errorf("failed to list exist route, %v", err) - return - } - - klog.V(5).Infof("exists ipv4 routes %v", bgpExists[kubeovnv1.ProtocolIPv4]) - toAdd, toDel := routeDiff(bgpExpected[kubeovnv1.ProtocolIPv4], bgpExists[kubeovnv1.ProtocolIPv4]) - klog.V(5).Infof("toAdd ipv4 routes %v", toAdd) - for _, route := range toAdd { - if err := c.addRoute(route); err != nil { - klog.Error(err) - } - } - - klog.V(5).Infof("toDel ipv4 routes %v", toDel) - for _, route := range toDel { - if err := c.delRoute(route); err != nil { - klog.Error(err) - } - } - } - - if len(c.config.NeighborIPv6Addresses) != 0 { - listIPv6PathRequest := &bgpapi.ListPathRequest{ - TableType: bgpapi.TableType_GLOBAL, - Family: &bgpapi.Family{Afi: bgpapi.Family_AFI_IP6, Safi: bgpapi.Family_SAFI_UNICAST}, - } - - if err := c.config.BgpServer.ListPath(context.Background(), listIPv6PathRequest, fn); err != nil { - klog.Errorf("failed to list exist route, %v", err) - return - } - - klog.V(5).Infof("exists ipv6 routes %v", bgpExists[kubeovnv1.ProtocolIPv6]) - toAdd, toDel := routeDiff(bgpExpected[kubeovnv1.ProtocolIPv6], bgpExists[kubeovnv1.ProtocolIPv6]) - klog.V(5).Infof("toAdd ipv6 routes %v", toAdd) - - for _, route := range toAdd { - if err := c.addRoute(route); err != nil { - klog.Error(err) - } - } - klog.V(5).Infof("toDel ipv6 routes %v", toDel) - for _, route := range toDel { - if err := c.delRoute(route); err != nil { - klog.Error(err) - } - } - } -} - -func routeDiff(expected, exists []string) (toAdd, toDel []string) { - expectedMap, existsMap := map[string]bool{}, map[string]bool{} - for _, e := range expected { - expectedMap[e] = true - } - for _, e := range exists { - existsMap[e] = true - } - - for e := range expectedMap { - if !existsMap[e] { - toAdd = append(toAdd, e) - } - } - - for e := range existsMap { - if !expectedMap[e] { - toDel = append(toDel, e) - } - } - return toAdd, toDel -} - -func parseRoute(route string) (string, uint32, error) { - var prefixLen uint32 = 32 - prefix := route - if strings.Contains(route, "/") { - prefix = strings.Split(route, "/")[0] - strLen := strings.Split(route, "/")[1] - intLen, err := strconv.Atoi(strLen) - if err != nil { - return "", 0, err - } - prefixLen = uint32(intLen) - } - return prefix, prefixLen, nil -} - -func (c *Controller) addRoute(route string) error { - routeAfi := bgpapi.Family_AFI_IP - if util.CheckProtocol(route) == kubeovnv1.ProtocolIPv6 { - routeAfi = bgpapi.Family_AFI_IP6 - } - - nlri, attrs, err := c.getNlriAndAttrs(route) - if err != nil { - return err - } - for _, attr := range attrs { - _, err = c.config.BgpServer.AddPath(context.Background(), &bgpapi.AddPathRequest{ - Path: &bgpapi.Path{ - Family: &bgpapi.Family{Afi: routeAfi, Safi: bgpapi.Family_SAFI_UNICAST}, - Nlri: nlri, - Pattrs: attr, - }, - }) - if err != nil { - klog.Errorf("add path failed, %v", err) - return err - } - } - return nil -} - -func (c *Controller) getNlriAndAttrs(route string) (*anypb.Any, [][]*anypb.Any, error) { - neighborAddresses := c.config.NeighborAddresses - if util.CheckProtocol(route) == kubeovnv1.ProtocolIPv6 { - neighborAddresses = c.config.NeighborIPv6Addresses - } - - prefix, prefixLen, err := parseRoute(route) - if err != nil { - return nil, nil, err - } - nlri, _ := anypb.New(&bgpapi.IPAddressPrefix{ - Prefix: prefix, - PrefixLen: prefixLen, - }) - - attrs := make([][]*anypb.Any, 0, len(neighborAddresses)) - for _, addr := range neighborAddresses { - a1, _ := anypb.New(&bgpapi.OriginAttribute{ - Origin: 0, - }) - a2, _ := anypb.New(&bgpapi.NextHopAttribute{ - NextHop: c.getNextHopAttribute(addr), - }) - attrs = append(attrs, []*anypb.Any{a1, a2}) - } - - return nlri, attrs, err -} - -func (c *Controller) delRoute(route string) error { - routeAfi := bgpapi.Family_AFI_IP - if util.CheckProtocol(route) == kubeovnv1.ProtocolIPv6 { - routeAfi = bgpapi.Family_AFI_IP6 - } - - nlri, attrs, err := c.getNlriAndAttrs(route) - if err != nil { - return err - } - for _, attr := range attrs { - err = c.config.BgpServer.DeletePath(context.Background(), &bgpapi.DeletePathRequest{ - Path: &bgpapi.Path{ - Family: &bgpapi.Family{Afi: routeAfi, Safi: bgpapi.Family_SAFI_UNICAST}, - Nlri: nlri, - Pattrs: attr, - }, - }) - if err != nil { - klog.Errorf("del path failed, %v", err) - return err - } - } - return nil -} - -func getNextHopFromPathAttributes(attrs []bgp.PathAttributeInterface) net.IP { - for _, attr := range attrs { - switch a := attr.(type) { - case *bgp.PathAttributeNextHop: - return a.Value - case *bgp.PathAttributeMpReachNLRI: - return a.Nexthop - } - } - return nil -} - -func (c *Controller) getNextHopAttribute(neighborAddress string) string { - nextHop := c.config.RouterID - routes, err := netlink.RouteGet(net.ParseIP(neighborAddress)) - if err == nil && len(routes) == 1 && routes[0].Src != nil { - nextHop = routes[0].Src.String() - } - proto := util.CheckProtocol(nextHop) - nextHopIP := net.ParseIP(nextHop) - nodeIP := c.config.NodeIPs[proto] - if nodeIP != nil && nextHopIP != nil && nextHopIP.Equal(c.config.PodIPs[proto]) { - nextHop = nodeIP.String() + if err := c.reconcileRoutes(bgpExpected); err != nil { + klog.Errorf("failed to reconcile routes: %s", err.Error()) } - return nextHop } diff --git a/pkg/speaker/utils.go b/pkg/speaker/utils.go new file mode 100644 index 00000000000..22e9e6b7b1b --- /dev/null +++ b/pkg/speaker/utils.go @@ -0,0 +1,112 @@ +package speaker + +import ( + "errors" + "fmt" + "os" + "strconv" + "strings" + + bgpapi "github.com/osrg/gobgp/v3/api" + v1 "k8s.io/api/core/v1" + + kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + "github.com/kubeovn/kube-ovn/pkg/util" +) + +// prefixMap is a map associating an IP family (IPv4 or IPv6) and an IP +type prefixMap map[string][]string + +// addExpectedPrefix adds a new prefix to the list of expected prefixes we should be announcing +func addExpectedPrefix(ip string, expectedPrefixes prefixMap) { + ipFamily := util.CheckProtocol(ip) + prefix := fmt.Sprintf("%s/%d", ip, maskMap[ipFamily]) + expectedPrefixes[ipFamily] = append(expectedPrefixes[ipFamily], prefix) +} + +// isPodAlive returns whether a Pod is alive or not +func isPodAlive(p *v1.Pod) bool { + if p.Status.Phase == v1.PodSucceeded && p.Spec.RestartPolicy != v1.RestartPolicyAlways { + return false + } + + if p.Status.Phase == v1.PodFailed && p.Spec.RestartPolicy == v1.RestartPolicyNever { + return false + } + + if p.Status.Phase == v1.PodFailed && p.Status.Reason == "Evicted" { + return false + } + return true +} + +// isClusterIPService returns whether a Service is of type ClusterIP or not +func isClusterIPService(svc *v1.Service) bool { + return svc.Spec.Type == v1.ServiceTypeClusterIP && + svc.Spec.ClusterIP != v1.ClusterIPNone && + len(svc.Spec.ClusterIP) != 0 +} + +// routeDiff returns the routes that should be added and the routes that should be deleted +// after receiving the routes we except to advertise versus the route we are advertising +func routeDiff(expected, exists []string) (toAdd, toDel []string) { + expectedMap, existsMap := map[string]bool{}, map[string]bool{} + for _, e := range expected { + expectedMap[e] = true + } + for _, e := range exists { + existsMap[e] = true + } + + for e := range expectedMap { + if !existsMap[e] { + toAdd = append(toAdd, e) + } + } + + for e := range existsMap { + if !expectedMap[e] { + toDel = append(toDel, e) + } + } + + return toAdd, toDel +} + +// parseRoute returns the prefix and length of the prefix (in bits) by parsing the received route +// If no prefix is mentioned in the route (e.g 1.1.1.1 instead of 1.1.1.1/32), the prefix length +// is assumed to be 32 bits +func parseRoute(route string) (string, uint32, error) { + var prefixLen uint32 = 32 + prefix := route + if strings.Contains(route, "/") { + prefix = strings.Split(route, "/")[0] + strLen := strings.Split(route, "/")[1] + intLen, err := strconv.ParseUint(strLen, 10, 32) + if err != nil { + return "", 0, err + } + prefixLen = uint32(intLen) + } + return prefix, prefixLen, nil +} + +// getGatewayName returns the name of the NAT GW hosting this speaker +func getGatewayName() string { + return os.Getenv(util.GatewayNameEnv) +} + +// kubeOvnFamilyToAFI converts an IP family to its associated AFI +func kubeOvnFamilyToAFI(ipFamily string) (bgpapi.Family_Afi, error) { + var family bgpapi.Family_Afi + switch ipFamily { + case kubeovnv1.ProtocolIPv4: + family = bgpapi.Family_AFI_IP + case kubeovnv1.ProtocolIPv6: + family = bgpapi.Family_AFI_IP6 + default: + return bgpapi.Family_AFI_UNKNOWN, errors.New("ip family is invalid") + } + + return family, nil +} diff --git a/pkg/util/const.go b/pkg/util/const.go index b0d7006b609..c38411a1625 100644 --- a/pkg/util/const.go +++ b/pkg/util/const.go @@ -213,7 +213,8 @@ const ( InternalType = "internal-port" DpdkType = "dpdk-port" - HostnameEnv = "KUBE_NODE_NAME" + HostnameEnv = "KUBE_NODE_NAME" + GatewayNameEnv = "GATEWAY_NAME" MirrosRetryMaxTimes = 5 MirrosRetryInterval = 1