From 61a14acc840a40901eb96b6be14a92d548e1ba26 Mon Sep 17 00:00:00 2001 From: Mat Kowalski Date: Mon, 30 Jan 2023 11:58:00 +0100 Subject: [PATCH] OPNET-197: Extend logic for detecting Node IP When generating keepalived.conf we are relying on the logic to gather IPs of all the cluster nodes for the IP stack used by the specific VIP. This logic currently relies only on the addresses reported as part of Node.Status.Addresses. In some scenarios it may be that the node is not reporting all its IPs via kubelet but still have those available. If we detect such a scenario (e.g. kubelet reporting only IPv4, but VIP being IPv6), we will check for Node annotations created by OVN as those use different source of truth so kubelet not reporting IPs is not affecting it. The newly introduced behaviour is just a fallback in case Node.Status.Addresses does not contain an IP of a requested stack, therefore not changing the behaviour for currently working scenarios. Contributes-to: OPNET-197 --- pkg/config/node.go | 153 ++++++++++++++++++++++++++++------- pkg/config/node_test.go | 146 +++++++++++++++++++++++++++++++++ pkg/monitor/dynkeepalived.go | 12 +-- pkg/monitor/monitor.go | 2 +- 4 files changed, 278 insertions(+), 35 deletions(-) create mode 100644 pkg/config/node_test.go diff --git a/pkg/config/node.go b/pkg/config/node.go index b18a6f0f..bf5c442c 100644 --- a/pkg/config/node.go +++ b/pkg/config/node.go @@ -3,6 +3,7 @@ package config import ( "bufio" "context" + "encoding/json" "errors" "fmt" "io/ioutil" @@ -291,7 +292,10 @@ func IsUpgradeStillRunning(kubeconfigPath string) (bool, error) { return true, nil } -func GetIngressConfig(kubeconfigPath string, filterIpType string) (ingressConfig IngressConfig, err error) { +func GetIngressConfig(kubeconfigPath string, vips []string) (IngressConfig, error) { + var machineNetwork string + var ingressConfig IngressConfig + config, err := utils.GetClientConfig("", kubeconfigPath) if err != nil { return ingressConfig, err @@ -306,23 +310,104 @@ func GetIngressConfig(kubeconfigPath string, filterIpType string) (ingressConfig return ingressConfig, err } + if len(vips) == 0 { + // This is not necessarily an error path because in handleBootstrapStopKeepalived we do + // call this function without providing any VIPs. Because of this, we only want to mark + // this scenario and avoid trying to calculate the machine networks. + log.Infof("Requested GetIngressConfig for empty VIP list.") + } else { + // As it is not possible to get cluster's Machine Network directly, we are using a workaround + // by detecting which of the local interfaces belongs to the same subnet as requested VIP. + // This interface can be used to detect what was the original machine network as it contains + // the subnet mask that we need. + machineNetwork, err = utils.GetLocalCIDRByIP(vips[0]) + if err != nil { + return ingressConfig, err + } + } + for _, node := range nodes.Items { - for _, address := range node.Status.Addresses { - if address.Type == v1.NodeInternalIP { - if filterIpType != "" { - if (net.ParseIP(filterIpType).To4() != nil && net.ParseIP(address.Address).To4() == nil) || - (net.ParseIP(filterIpType).To4() == nil && net.ParseIP(address.Address).To4() != nil) { - continue - } - } - ingressConfig.Peers = append(ingressConfig.Peers, address.Address) - } + addr, err := getNodeIpForRequestedIpStack(node, vips, machineNetwork) + if err != nil { + log.Warnf("For node %s could not retrieve node's IP. Ignoring", node.ObjectMeta.Name) + } else { + ingressConfig.Peers = append(ingressConfig.Peers, addr) } } return ingressConfig, nil } +func getNodeIpForRequestedIpStack(node v1.Node, filterIps []string, machineNetwork string) (string, error) { + log.Infof("Searching for Node IP of %s. Using '%s' as machine network. Filtering out VIPs '%s'.", node.Name, machineNetwork, filterIps) + + if len(filterIps) == 0 { + return "", fmt.Errorf("for node %s requested NodeIP detection with empty filterIP list. Cannot detect IP stack", node.Name) + } + + isFilterV4 := utils.IsIPv4(net.ParseIP(filterIps[0])) + isFilterV6 := utils.IsIPv6(net.ParseIP(filterIps[0])) + + if !isFilterV4 && !isFilterV6 { + return "", fmt.Errorf("for node %s IPs are neither IPv4 nor IPv6", node.Name) + } + + // We need to collect IP address of a matching IP stack for every node that is part of the + // cluster. We need to account for a scenario where Node.Status.Addresses list is incomplete + // and use different source of the address. + // + // We will use here the following sources: + // 1) Node.Status.Addresses list + // 2) Node annotation "k8s.ovn.org/host-addresses" in combination with Machine Networks + // + // If none of those returns a conclusive result, we don't return an IP for this node. This is + // not a desired outcome, but can be extended in the future if desired. + + var addr string + for _, address := range node.Status.Addresses { + if address.Type == v1.NodeInternalIP { + if (utils.IsIPv4(net.ParseIP(address.Address)) && isFilterV4) || (utils.IsIPv6(net.ParseIP(address.Address)) && isFilterV6) { + addr = address.Address + log.Infof("For node %s selected peer address %s using NodeInternalIP", node.Name, addr) + } + } + } + if addr == "" { + log.Infof("For node %s can't find address using NodeInternalIP. Fallback to OVN annotation.", node.Name) + + var ovnHostAddresses []string + if err := json.Unmarshal([]byte(node.Annotations["k8s.ovn.org/host-addresses"]), &ovnHostAddresses); err != nil { + log.Warnf("Couldn't unmarshall OVN annotations: '%s'. Skipping.", node.Annotations["k8s.ovn.org/host-addresses"]) + } + + AddrList: + for _, hostAddr := range ovnHostAddresses { + for _, filterIp := range filterIps { + if hostAddr == filterIp { + log.Infof("Address %s is VIP. Skipping.", hostAddr) + continue AddrList + } + } + + if (utils.IsIPv4(net.ParseIP(hostAddr)) && !isFilterV4) || (utils.IsIPv6(net.ParseIP(hostAddr)) && !isFilterV6) { + log.Infof("Address %s doesn't match requested IP stack. Skipping.", hostAddr) + continue + } + + match, err := utils.IpInCidr(hostAddr, machineNetwork) + if err != nil { + log.Infof("Address '%s' and subnet '%s' couldn't be parsed. Skipping.", hostAddr, machineNetwork) + continue + } + if match { + addr = hostAddr + log.Infof("For node %s selected peer address %s using using OVN annotations.", node.Name, addr) + } + } + } + return addr, nil +} + // Returns a Node object populated with the configuration specified by the parameters // to the function. // kubeconfigPath: The path to a kubeconfig that can be used to read cluster status @@ -457,8 +542,7 @@ func getNodeConfig(kubeconfigPath, clusterConfigPath, resolvConfPath string, api // getSortedBackends builds config to communicate with kube-api based on kubeconfigPath parameter value, if kubeconfigPath is not empty it will build the // config based on that content else config will point to localhost. -func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, apiVip net.IP) (backends []Backend, err error) { - +func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, vips []net.IP) (backends []Backend, err error) { kubeApiServerUrl := "" if readFromLocalAPI { kubeApiServerUrl = localhostKubeApiServerUrl @@ -483,19 +567,28 @@ func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, apiVip net. }).Info("Failed to get master Nodes list") return []Backend{}, err } - apiVipv6 := utils.IsIPv6(apiVip) + if len(vips) == 0 { + return []Backend{}, fmt.Errorf("Trying to build config using empty VIPs") + } + + // As it is not possible to get cluster's Machine Network directly, we are using a workaround + // by detecting which of the local interfaces belongs to the same subnet as requested VIP. + // This interface can be used to detect what was the original machine network as it contains + // the subnet mask that we need. + machineNetwork, err := utils.GetLocalCIDRByIP(vips[0].String()) + if err != nil { + log.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Could not retrieve subnet for IP %s", vips[0].String()) + return []Backend{}, err + } + for _, node := range nodes.Items { - masterIp := "" - for _, address := range node.Status.Addresses { - if address.Type == v1.NodeInternalIP && utils.IsIPv6(net.ParseIP(address.Address)) == apiVipv6 { - masterIp = address.Address - break - } - } - if masterIp != "" { - backends = append(backends, Backend{Host: node.ObjectMeta.Name, Address: masterIp}) + masterIp, err := getNodeIpForRequestedIpStack(node, utils.ConvertIpsToStrings(vips), machineNetwork) + if err != nil { + log.Warnf("Could not retrieve node's IP for %s. Ignoring", node.ObjectMeta.Name) } else { - log.Warnf("Could not retrieve node's IP for %s", node.ObjectMeta.Name) + backends = append(backends, Backend{Host: node.ObjectMeta.Name, Address: masterIp}) } } @@ -505,23 +598,27 @@ func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, apiVip net. return backends, err } -func GetLBConfig(kubeconfigPath string, apiPort, lbPort, statPort uint16, apiVip net.IP) (ApiLBConfig, error) { +func GetLBConfig(kubeconfigPath string, apiPort, lbPort, statPort uint16, vips []net.IP) (ApiLBConfig, error) { config := ApiLBConfig{ ApiPort: apiPort, LbPort: lbPort, StatPort: statPort, } + if len(vips) == 0 { + return config, fmt.Errorf("Trying to generate loadbalancer config using empty VIPs") + } + // LB frontend address: IPv6 '::' , IPv4 '' - if apiVip.To4() == nil { + if utils.IsIPv6(vips[0]) { config.FrontendAddr = "::" } // Try reading master nodes details first from api-vip:kube-apiserver and failover to localhost:kube-apiserver - backends, err := getSortedBackends(kubeconfigPath, false, apiVip) + backends, err := getSortedBackends(kubeconfigPath, false, vips) if err != nil { log.Infof("An error occurred while trying to read master nodes details from api-vip:kube-apiserver: %v", err) log.Infof("Trying to read master nodes details from localhost:kube-apiserver") - backends, err = getSortedBackends(kubeconfigPath, true, apiVip) + backends, err = getSortedBackends(kubeconfigPath, true, vips) if err != nil { log.WithFields(logrus.Fields{ "kubeconfigPath": kubeconfigPath, diff --git a/pkg/config/node_test.go b/pkg/config/node_test.go new file mode 100644 index 00000000..98eb5223 --- /dev/null +++ b/pkg/config/node_test.go @@ -0,0 +1,146 @@ +package config + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + testOvnHostAddressesAnnotation = map[string]string{ + "k8s.ovn.org/host-addresses": "[\"192.168.1.102\",\"192.168.1.99\",\"192.168.1.101\",\"fd00::101\",\"2001:db8::49a\",\"fd00::102\",\"fd00::5\",\"fd69::2\"]", + } + + testNodeDualStack1 = v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "testNode"}, + Status: v1.NodeStatus{Addresses: []v1.NodeAddress{ + {Type: "InternalIP", Address: "192.168.1.99"}, + {Type: "InternalIP", Address: "fd00::5"}, + {Type: "ExternalIP", Address: "172.16.1.99"}, + }}} + testNodeDualStack2 = v1.Node{ + + Status: v1.NodeStatus{Addresses: []v1.NodeAddress{ + {Type: "InternalIP", Address: "192.168.1.99"}, + {Type: "ExternalIP", Address: "172.16.1.99"}, + }}, + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode", + Annotations: testOvnHostAddressesAnnotation, + }, + } + testNodeDualStack3 = v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode", + Annotations: testOvnHostAddressesAnnotation, + }, + } + testNodeSingleStackV4 = v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "testNode"}, + Status: v1.NodeStatus{Addresses: []v1.NodeAddress{ + {Type: "InternalIP", Address: "192.168.1.99"}, + {Type: "ExternalIP", Address: "172.16.1.99"}, + }}} + testNodeSingleStackV6 = v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "testNode"}, + Status: v1.NodeStatus{Addresses: []v1.NodeAddress{ + {Type: "InternalIP", Address: "fd00::5"}, + {Type: "ExternalIP", Address: "2001:db8::49a"}, + }}} + + testMachineNetworkV4 = "192.168.1.0/24" + testMachineNetworkV6 = "fd00::5/64" + testApiVipV4 = "192.168.1.101" + testApiVipV6 = "fd00::101" + testIngressVipV4 = "192.168.1.102" + testIngressVipV6 = "fd00::102" +) + +var _ = Describe("getNodePeersForIpStack", func() { + Context("for dual-stack node", func() { + Context("with address only in status", func() { + It("matches an IPv4 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeDualStack1, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4) + Expect(res).To(Equal("192.168.1.99")) + Expect(err).To(BeNil()) + }) + It("matches an IPv6 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeDualStack1, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6) + Expect(res).To(Equal("fd00::5")) + Expect(err).To(BeNil()) + }) + }) + + Context("with address only in OVN annotation", func() { + It("matches an IPv4 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeDualStack3, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4) + Expect(res).To(Equal("192.168.1.99")) + Expect(err).To(BeNil()) + }) + It("matches an IPv6 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeDualStack3, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6) + Expect(res).To(Equal("fd00::5")) + Expect(err).To(BeNil()) + }) + }) + + Context("with address in status and OVN annotation", func() { + It("matches an IPv4 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeDualStack2, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4) + Expect(res).To(Equal("192.168.1.99")) + Expect(err).To(BeNil()) + }) + It("matches an IPv6 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeDualStack2, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6) + Expect(res).To(Equal("fd00::5")) + Expect(err).To(BeNil()) + }) + }) + }) + + Context("for single-stack v4 node", func() { + It("matches an IPv4 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV4, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4) + Expect(res).To(Equal("192.168.1.99")) + Expect(err).To(BeNil()) + }) + It("empty for IPv6 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV4, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6) + Expect(res).To(Equal("")) + Expect(err).To(BeNil()) + }) + }) + + Context("for single-stack v6 node", func() { + It("empty for IPv4 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV6, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4) + Expect(res).To(Equal("")) + Expect(err).To(BeNil()) + }) + It("matches an IPv6 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV6, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6) + Expect(res).To(Equal("fd00::5")) + Expect(err).To(BeNil()) + }) + }) + + It("empty for empty node", func() { + res, err := getNodeIpForRequestedIpStack(v1.Node{}, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4) + Expect(res).To(Equal("")) + Expect(err).To(BeNil()) + }) + + It("empty for node with IPs and empty VIP requested", func() { + res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV4, []string{}, testMachineNetworkV4) + Expect(res).To(Equal("")) + Expect(err.Error()).To(Equal("for node testNode requested NodeIP detection with empty filterIP list. Cannot detect IP stack")) + }) +}) + +func Test(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Config tests") +} diff --git a/pkg/monitor/dynkeepalived.go b/pkg/monitor/dynkeepalived.go index bc8787fc..1cabdc41 100644 --- a/pkg/monitor/dynkeepalived.go +++ b/pkg/monitor/dynkeepalived.go @@ -64,23 +64,23 @@ func updateUnicastConfig(kubeconfigPath string, newConfig *config.Node) { if !newConfig.EnableUnicast { return } - newConfig.IngressConfig, err = config.GetIngressConfig(kubeconfigPath, newConfig.Cluster.APIVIP) + newConfig.IngressConfig, err = config.GetIngressConfig(kubeconfigPath, []string{newConfig.Cluster.APIVIP, newConfig.Cluster.IngressVIP}) if err != nil { log.Warnf("Could not retrieve ingress config: %v", err) } - newConfig.LBConfig, err = config.GetLBConfig(kubeconfigPath, dummyPortNum, dummyPortNum, dummyPortNum, net.ParseIP(newConfig.Cluster.APIVIP)) + newConfig.LBConfig, err = config.GetLBConfig(kubeconfigPath, dummyPortNum, dummyPortNum, dummyPortNum, []net.IP{net.ParseIP(newConfig.Cluster.APIVIP), net.ParseIP(newConfig.Cluster.IngressVIP)}) if err != nil { log.Warnf("Could not retrieve LB config: %v", err) } for i, c := range *newConfig.Configs { // Must do this by index instead of using c because c is local to this loop - (*newConfig.Configs)[i].IngressConfig, err = config.GetIngressConfig(kubeconfigPath, c.Cluster.APIVIP) + (*newConfig.Configs)[i].IngressConfig, err = config.GetIngressConfig(kubeconfigPath, []string{c.Cluster.APIVIP, c.Cluster.IngressVIP}) if err != nil { log.Warnf("Could not retrieve ingress config: %v", err) } - (*newConfig.Configs)[i].LBConfig, err = config.GetLBConfig(kubeconfigPath, dummyPortNum, dummyPortNum, dummyPortNum, net.ParseIP(c.Cluster.APIVIP)) + (*newConfig.Configs)[i].LBConfig, err = config.GetLBConfig(kubeconfigPath, dummyPortNum, dummyPortNum, dummyPortNum, []net.IP{net.ParseIP(c.Cluster.APIVIP), net.ParseIP(c.Cluster.IngressVIP)}) if err != nil { log.Warnf("Could not retrieve LB config: %v", err) } @@ -147,7 +147,7 @@ func handleBootstrapStopKeepalived(kubeconfigPath string, bootstrapStopKeepalive first that it's operational. */ log.Info("handleBootstrapStopKeepalived: verify first that local kube-apiserver is operational") for start := time.Now(); time.Since(start) < time.Second*30; { - if _, err := config.GetIngressConfig(kubeconfigPath, ""); err == nil { + if _, err := config.GetIngressConfig(kubeconfigPath, []string{}); err == nil { log.Info("handleBootstrapStopKeepalived: local kube-apiserver is operational") break } @@ -156,7 +156,7 @@ func handleBootstrapStopKeepalived(kubeconfigPath string, bootstrapStopKeepalive } for { - if _, err := config.GetIngressConfig(kubeconfigPath, ""); err != nil { + if _, err := config.GetIngressConfig(kubeconfigPath, []string{}); err != nil { // We have started to talk to Ironic through the API VIP as well, // so if Ironic is still up then we need to keep the VIP, even if // the apiserver has gone down. diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 34317933..355dc7d7 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -58,7 +58,7 @@ func Monitor(kubeconfigPath, clusterName, clusterDomain, templatePath, cfgPath s } return nil default: - config, err := config.GetLBConfig(kubeconfigPath, apiPort, lbPort, statPort, net.ParseIP(apiVips[0])) + config, err := config.GetLBConfig(kubeconfigPath, apiPort, lbPort, statPort, []net.IP{net.ParseIP(apiVips[0])}) if err != nil { log.WithFields(logrus.Fields{ "kubeconfigPath": kubeconfigPath,