diff --git a/pkg/config/node.go b/pkg/config/node.go index b18a6f0f..bf5c442c 100644 --- a/pkg/config/node.go +++ b/pkg/config/node.go @@ -3,6 +3,7 @@ package config import ( "bufio" "context" + "encoding/json" "errors" "fmt" "io/ioutil" @@ -291,7 +292,10 @@ func IsUpgradeStillRunning(kubeconfigPath string) (bool, error) { return true, nil } -func GetIngressConfig(kubeconfigPath string, filterIpType string) (ingressConfig IngressConfig, err error) { +func GetIngressConfig(kubeconfigPath string, vips []string) (IngressConfig, error) { + var machineNetwork string + var ingressConfig IngressConfig + config, err := utils.GetClientConfig("", kubeconfigPath) if err != nil { return ingressConfig, err @@ -306,23 +310,104 @@ func GetIngressConfig(kubeconfigPath string, filterIpType string) (ingressConfig return ingressConfig, err } + if len(vips) == 0 { + // This is not necessarily an error path because in handleBootstrapStopKeepalived we do + // call this function without providing any VIPs. Because of this, we only want to mark + // this scenario and avoid trying to calculate the machine networks. + log.Infof("Requested GetIngressConfig for empty VIP list.") + } else { + // As it is not possible to get cluster's Machine Network directly, we are using a workaround + // by detecting which of the local interfaces belongs to the same subnet as requested VIP. + // This interface can be used to detect what was the original machine network as it contains + // the subnet mask that we need. + machineNetwork, err = utils.GetLocalCIDRByIP(vips[0]) + if err != nil { + return ingressConfig, err + } + } + for _, node := range nodes.Items { - for _, address := range node.Status.Addresses { - if address.Type == v1.NodeInternalIP { - if filterIpType != "" { - if (net.ParseIP(filterIpType).To4() != nil && net.ParseIP(address.Address).To4() == nil) || - (net.ParseIP(filterIpType).To4() == nil && net.ParseIP(address.Address).To4() != nil) { - continue - } - } - ingressConfig.Peers = append(ingressConfig.Peers, address.Address) - } + addr, err := getNodeIpForRequestedIpStack(node, vips, machineNetwork) + if err != nil { + log.Warnf("For node %s could not retrieve node's IP. Ignoring", node.ObjectMeta.Name) + } else { + ingressConfig.Peers = append(ingressConfig.Peers, addr) } } return ingressConfig, nil } +func getNodeIpForRequestedIpStack(node v1.Node, filterIps []string, machineNetwork string) (string, error) { + log.Infof("Searching for Node IP of %s. Using '%s' as machine network. Filtering out VIPs '%s'.", node.Name, machineNetwork, filterIps) + + if len(filterIps) == 0 { + return "", fmt.Errorf("for node %s requested NodeIP detection with empty filterIP list. Cannot detect IP stack", node.Name) + } + + isFilterV4 := utils.IsIPv4(net.ParseIP(filterIps[0])) + isFilterV6 := utils.IsIPv6(net.ParseIP(filterIps[0])) + + if !isFilterV4 && !isFilterV6 { + return "", fmt.Errorf("for node %s IPs are neither IPv4 nor IPv6", node.Name) + } + + // We need to collect IP address of a matching IP stack for every node that is part of the + // cluster. We need to account for a scenario where Node.Status.Addresses list is incomplete + // and use different source of the address. + // + // We will use here the following sources: + // 1) Node.Status.Addresses list + // 2) Node annotation "k8s.ovn.org/host-addresses" in combination with Machine Networks + // + // If none of those returns a conclusive result, we don't return an IP for this node. This is + // not a desired outcome, but can be extended in the future if desired. + + var addr string + for _, address := range node.Status.Addresses { + if address.Type == v1.NodeInternalIP { + if (utils.IsIPv4(net.ParseIP(address.Address)) && isFilterV4) || (utils.IsIPv6(net.ParseIP(address.Address)) && isFilterV6) { + addr = address.Address + log.Infof("For node %s selected peer address %s using NodeInternalIP", node.Name, addr) + } + } + } + if addr == "" { + log.Infof("For node %s can't find address using NodeInternalIP. Fallback to OVN annotation.", node.Name) + + var ovnHostAddresses []string + if err := json.Unmarshal([]byte(node.Annotations["k8s.ovn.org/host-addresses"]), &ovnHostAddresses); err != nil { + log.Warnf("Couldn't unmarshall OVN annotations: '%s'. Skipping.", node.Annotations["k8s.ovn.org/host-addresses"]) + } + + AddrList: + for _, hostAddr := range ovnHostAddresses { + for _, filterIp := range filterIps { + if hostAddr == filterIp { + log.Infof("Address %s is VIP. Skipping.", hostAddr) + continue AddrList + } + } + + if (utils.IsIPv4(net.ParseIP(hostAddr)) && !isFilterV4) || (utils.IsIPv6(net.ParseIP(hostAddr)) && !isFilterV6) { + log.Infof("Address %s doesn't match requested IP stack. Skipping.", hostAddr) + continue + } + + match, err := utils.IpInCidr(hostAddr, machineNetwork) + if err != nil { + log.Infof("Address '%s' and subnet '%s' couldn't be parsed. Skipping.", hostAddr, machineNetwork) + continue + } + if match { + addr = hostAddr + log.Infof("For node %s selected peer address %s using using OVN annotations.", node.Name, addr) + } + } + } + return addr, nil +} + // Returns a Node object populated with the configuration specified by the parameters // to the function. // kubeconfigPath: The path to a kubeconfig that can be used to read cluster status @@ -457,8 +542,7 @@ func getNodeConfig(kubeconfigPath, clusterConfigPath, resolvConfPath string, api // getSortedBackends builds config to communicate with kube-api based on kubeconfigPath parameter value, if kubeconfigPath is not empty it will build the // config based on that content else config will point to localhost. -func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, apiVip net.IP) (backends []Backend, err error) { - +func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, vips []net.IP) (backends []Backend, err error) { kubeApiServerUrl := "" if readFromLocalAPI { kubeApiServerUrl = localhostKubeApiServerUrl @@ -483,19 +567,28 @@ func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, apiVip net. }).Info("Failed to get master Nodes list") return []Backend{}, err } - apiVipv6 := utils.IsIPv6(apiVip) + if len(vips) == 0 { + return []Backend{}, fmt.Errorf("Trying to build config using empty VIPs") + } + + // As it is not possible to get cluster's Machine Network directly, we are using a workaround + // by detecting which of the local interfaces belongs to the same subnet as requested VIP. + // This interface can be used to detect what was the original machine network as it contains + // the subnet mask that we need. + machineNetwork, err := utils.GetLocalCIDRByIP(vips[0].String()) + if err != nil { + log.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Could not retrieve subnet for IP %s", vips[0].String()) + return []Backend{}, err + } + for _, node := range nodes.Items { - masterIp := "" - for _, address := range node.Status.Addresses { - if address.Type == v1.NodeInternalIP && utils.IsIPv6(net.ParseIP(address.Address)) == apiVipv6 { - masterIp = address.Address - break - } - } - if masterIp != "" { - backends = append(backends, Backend{Host: node.ObjectMeta.Name, Address: masterIp}) + masterIp, err := getNodeIpForRequestedIpStack(node, utils.ConvertIpsToStrings(vips), machineNetwork) + if err != nil { + log.Warnf("Could not retrieve node's IP for %s. Ignoring", node.ObjectMeta.Name) } else { - log.Warnf("Could not retrieve node's IP for %s", node.ObjectMeta.Name) + backends = append(backends, Backend{Host: node.ObjectMeta.Name, Address: masterIp}) } } @@ -505,23 +598,27 @@ func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, apiVip net. return backends, err } -func GetLBConfig(kubeconfigPath string, apiPort, lbPort, statPort uint16, apiVip net.IP) (ApiLBConfig, error) { +func GetLBConfig(kubeconfigPath string, apiPort, lbPort, statPort uint16, vips []net.IP) (ApiLBConfig, error) { config := ApiLBConfig{ ApiPort: apiPort, LbPort: lbPort, StatPort: statPort, } + if len(vips) == 0 { + return config, fmt.Errorf("Trying to generate loadbalancer config using empty VIPs") + } + // LB frontend address: IPv6 '::' , IPv4 '' - if apiVip.To4() == nil { + if utils.IsIPv6(vips[0]) { config.FrontendAddr = "::" } // Try reading master nodes details first from api-vip:kube-apiserver and failover to localhost:kube-apiserver - backends, err := getSortedBackends(kubeconfigPath, false, apiVip) + backends, err := getSortedBackends(kubeconfigPath, false, vips) if err != nil { log.Infof("An error occurred while trying to read master nodes details from api-vip:kube-apiserver: %v", err) log.Infof("Trying to read master nodes details from localhost:kube-apiserver") - backends, err = getSortedBackends(kubeconfigPath, true, apiVip) + backends, err = getSortedBackends(kubeconfigPath, true, vips) if err != nil { log.WithFields(logrus.Fields{ "kubeconfigPath": kubeconfigPath, diff --git a/pkg/config/node_test.go b/pkg/config/node_test.go new file mode 100644 index 00000000..98eb5223 --- /dev/null +++ b/pkg/config/node_test.go @@ -0,0 +1,146 @@ +package config + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + testOvnHostAddressesAnnotation = map[string]string{ + "k8s.ovn.org/host-addresses": "[\"192.168.1.102\",\"192.168.1.99\",\"192.168.1.101\",\"fd00::101\",\"2001:db8::49a\",\"fd00::102\",\"fd00::5\",\"fd69::2\"]", + } + + testNodeDualStack1 = v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "testNode"}, + Status: v1.NodeStatus{Addresses: []v1.NodeAddress{ + {Type: "InternalIP", Address: "192.168.1.99"}, + {Type: "InternalIP", Address: "fd00::5"}, + {Type: "ExternalIP", Address: "172.16.1.99"}, + }}} + testNodeDualStack2 = v1.Node{ + + Status: v1.NodeStatus{Addresses: []v1.NodeAddress{ + {Type: "InternalIP", Address: "192.168.1.99"}, + {Type: "ExternalIP", Address: "172.16.1.99"}, + }}, + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode", + Annotations: testOvnHostAddressesAnnotation, + }, + } + testNodeDualStack3 = v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode", + Annotations: testOvnHostAddressesAnnotation, + }, + } + testNodeSingleStackV4 = v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "testNode"}, + Status: v1.NodeStatus{Addresses: []v1.NodeAddress{ + {Type: "InternalIP", Address: "192.168.1.99"}, + {Type: "ExternalIP", Address: "172.16.1.99"}, + }}} + testNodeSingleStackV6 = v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "testNode"}, + Status: v1.NodeStatus{Addresses: []v1.NodeAddress{ + {Type: "InternalIP", Address: "fd00::5"}, + {Type: "ExternalIP", Address: "2001:db8::49a"}, + }}} + + testMachineNetworkV4 = "192.168.1.0/24" + testMachineNetworkV6 = "fd00::5/64" + testApiVipV4 = "192.168.1.101" + testApiVipV6 = "fd00::101" + testIngressVipV4 = "192.168.1.102" + testIngressVipV6 = "fd00::102" +) + +var _ = Describe("getNodePeersForIpStack", func() { + Context("for dual-stack node", func() { + Context("with address only in status", func() { + It("matches an IPv4 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeDualStack1, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4) + Expect(res).To(Equal("192.168.1.99")) + Expect(err).To(BeNil()) + }) + It("matches an IPv6 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeDualStack1, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6) + Expect(res).To(Equal("fd00::5")) + Expect(err).To(BeNil()) + }) + }) + + Context("with address only in OVN annotation", func() { + It("matches an IPv4 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeDualStack3, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4) + Expect(res).To(Equal("192.168.1.99")) + Expect(err).To(BeNil()) + }) + It("matches an IPv6 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeDualStack3, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6) + Expect(res).To(Equal("fd00::5")) + Expect(err).To(BeNil()) + }) + }) + + Context("with address in status and OVN annotation", func() { + It("matches an IPv4 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeDualStack2, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4) + Expect(res).To(Equal("192.168.1.99")) + Expect(err).To(BeNil()) + }) + It("matches an IPv6 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeDualStack2, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6) + Expect(res).To(Equal("fd00::5")) + Expect(err).To(BeNil()) + }) + }) + }) + + Context("for single-stack v4 node", func() { + It("matches an IPv4 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV4, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4) + Expect(res).To(Equal("192.168.1.99")) + Expect(err).To(BeNil()) + }) + It("empty for IPv6 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV4, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6) + Expect(res).To(Equal("")) + Expect(err).To(BeNil()) + }) + }) + + Context("for single-stack v6 node", func() { + It("empty for IPv4 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV6, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4) + Expect(res).To(Equal("")) + Expect(err).To(BeNil()) + }) + It("matches an IPv6 VIP", func() { + res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV6, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6) + Expect(res).To(Equal("fd00::5")) + Expect(err).To(BeNil()) + }) + }) + + It("empty for empty node", func() { + res, err := getNodeIpForRequestedIpStack(v1.Node{}, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4) + Expect(res).To(Equal("")) + Expect(err).To(BeNil()) + }) + + It("empty for node with IPs and empty VIP requested", func() { + res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV4, []string{}, testMachineNetworkV4) + Expect(res).To(Equal("")) + Expect(err.Error()).To(Equal("for node testNode requested NodeIP detection with empty filterIP list. Cannot detect IP stack")) + }) +}) + +func Test(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Config tests") +} diff --git a/pkg/monitor/dynkeepalived.go b/pkg/monitor/dynkeepalived.go index bc8787fc..1cabdc41 100644 --- a/pkg/monitor/dynkeepalived.go +++ b/pkg/monitor/dynkeepalived.go @@ -64,23 +64,23 @@ func updateUnicastConfig(kubeconfigPath string, newConfig *config.Node) { if !newConfig.EnableUnicast { return } - newConfig.IngressConfig, err = config.GetIngressConfig(kubeconfigPath, newConfig.Cluster.APIVIP) + newConfig.IngressConfig, err = config.GetIngressConfig(kubeconfigPath, []string{newConfig.Cluster.APIVIP, newConfig.Cluster.IngressVIP}) if err != nil { log.Warnf("Could not retrieve ingress config: %v", err) } - newConfig.LBConfig, err = config.GetLBConfig(kubeconfigPath, dummyPortNum, dummyPortNum, dummyPortNum, net.ParseIP(newConfig.Cluster.APIVIP)) + newConfig.LBConfig, err = config.GetLBConfig(kubeconfigPath, dummyPortNum, dummyPortNum, dummyPortNum, []net.IP{net.ParseIP(newConfig.Cluster.APIVIP), net.ParseIP(newConfig.Cluster.IngressVIP)}) if err != nil { log.Warnf("Could not retrieve LB config: %v", err) } for i, c := range *newConfig.Configs { // Must do this by index instead of using c because c is local to this loop - (*newConfig.Configs)[i].IngressConfig, err = config.GetIngressConfig(kubeconfigPath, c.Cluster.APIVIP) + (*newConfig.Configs)[i].IngressConfig, err = config.GetIngressConfig(kubeconfigPath, []string{c.Cluster.APIVIP, c.Cluster.IngressVIP}) if err != nil { log.Warnf("Could not retrieve ingress config: %v", err) } - (*newConfig.Configs)[i].LBConfig, err = config.GetLBConfig(kubeconfigPath, dummyPortNum, dummyPortNum, dummyPortNum, net.ParseIP(c.Cluster.APIVIP)) + (*newConfig.Configs)[i].LBConfig, err = config.GetLBConfig(kubeconfigPath, dummyPortNum, dummyPortNum, dummyPortNum, []net.IP{net.ParseIP(c.Cluster.APIVIP), net.ParseIP(c.Cluster.IngressVIP)}) if err != nil { log.Warnf("Could not retrieve LB config: %v", err) } @@ -147,7 +147,7 @@ func handleBootstrapStopKeepalived(kubeconfigPath string, bootstrapStopKeepalive first that it's operational. */ log.Info("handleBootstrapStopKeepalived: verify first that local kube-apiserver is operational") for start := time.Now(); time.Since(start) < time.Second*30; { - if _, err := config.GetIngressConfig(kubeconfigPath, ""); err == nil { + if _, err := config.GetIngressConfig(kubeconfigPath, []string{}); err == nil { log.Info("handleBootstrapStopKeepalived: local kube-apiserver is operational") break } @@ -156,7 +156,7 @@ func handleBootstrapStopKeepalived(kubeconfigPath string, bootstrapStopKeepalive } for { - if _, err := config.GetIngressConfig(kubeconfigPath, ""); err != nil { + if _, err := config.GetIngressConfig(kubeconfigPath, []string{}); err != nil { // We have started to talk to Ironic through the API VIP as well, // so if Ironic is still up then we need to keep the VIP, even if // the apiserver has gone down. diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 34317933..355dc7d7 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -58,7 +58,7 @@ func Monitor(kubeconfigPath, clusterName, clusterDomain, templatePath, cfgPath s } return nil default: - config, err := config.GetLBConfig(kubeconfigPath, apiPort, lbPort, statPort, net.ParseIP(apiVips[0])) + config, err := config.GetLBConfig(kubeconfigPath, apiPort, lbPort, statPort, []net.IP{net.ParseIP(apiVips[0])}) if err != nil { log.WithFields(logrus.Fields{ "kubeconfigPath": kubeconfigPath,