Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OPNET-197: Extend logic for detecting Node IP #218

Merged
merged 1 commit into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 125 additions & 28 deletions pkg/config/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -291,7 +292,10 @@ func IsUpgradeStillRunning(kubeconfigPath string) (bool, error) {
return true, nil
}

func GetIngressConfig(kubeconfigPath string, filterIpType string) (ingressConfig IngressConfig, err error) {
func GetIngressConfig(kubeconfigPath string, vips []string) (IngressConfig, error) {
var machineNetwork string
var ingressConfig IngressConfig

config, err := utils.GetClientConfig("", kubeconfigPath)
if err != nil {
return ingressConfig, err
Expand All @@ -306,23 +310,104 @@ func GetIngressConfig(kubeconfigPath string, filterIpType string) (ingressConfig
return ingressConfig, err
}

if len(vips) == 0 {
// This is not necessarily an error path because in handleBootstrapStopKeepalived we do
// call this function without providing any VIPs. Because of this, we only want to mark
// this scenario and avoid trying to calculate the machine networks.
log.Infof("Requested GetIngressConfig for empty VIP list.")
} else {
// As it is not possible to get cluster's Machine Network directly, we are using a workaround
// by detecting which of the local interfaces belongs to the same subnet as requested VIP.
// This interface can be used to detect what was the original machine network as it contains
// the subnet mask that we need.
machineNetwork, err = utils.GetLocalCIDRByIP(vips[0])
if err != nil {
return ingressConfig, err
}
}

for _, node := range nodes.Items {
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeInternalIP {
if filterIpType != "" {
if (net.ParseIP(filterIpType).To4() != nil && net.ParseIP(address.Address).To4() == nil) ||
(net.ParseIP(filterIpType).To4() == nil && net.ParseIP(address.Address).To4() != nil) {
continue
}
}
ingressConfig.Peers = append(ingressConfig.Peers, address.Address)
}
addr, err := getNodeIpForRequestedIpStack(node, vips, machineNetwork)
if err != nil {
log.Warnf("For node %s could not retrieve node's IP. Ignoring", node.ObjectMeta.Name)
} else {
ingressConfig.Peers = append(ingressConfig.Peers, addr)
}
}

return ingressConfig, nil
}

func getNodeIpForRequestedIpStack(node v1.Node, filterIps []string, machineNetwork string) (string, error) {
log.Infof("Searching for Node IP of %s. Using '%s' as machine network. Filtering out VIPs '%s'.", node.Name, machineNetwork, filterIps)

if len(filterIps) == 0 {
return "", fmt.Errorf("for node %s requested NodeIP detection with empty filterIP list. Cannot detect IP stack", node.Name)
}

isFilterV4 := utils.IsIPv4(net.ParseIP(filterIps[0]))
isFilterV6 := utils.IsIPv6(net.ParseIP(filterIps[0]))

if !isFilterV4 && !isFilterV6 {
return "", fmt.Errorf("for node %s IPs are neither IPv4 nor IPv6", node.Name)
}

// We need to collect IP address of a matching IP stack for every node that is part of the
// cluster. We need to account for a scenario where Node.Status.Addresses list is incomplete
// and use different source of the address.
//
// We will use here the following sources:
// 1) Node.Status.Addresses list
// 2) Node annotation "k8s.ovn.org/host-addresses" in combination with Machine Networks
//
// If none of those returns a conclusive result, we don't return an IP for this node. This is
// not a desired outcome, but can be extended in the future if desired.

var addr string
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeInternalIP {
if (utils.IsIPv4(net.ParseIP(address.Address)) && isFilterV4) || (utils.IsIPv6(net.ParseIP(address.Address)) && isFilterV6) {
addr = address.Address
log.Infof("For node %s selected peer address %s using NodeInternalIP", node.Name, addr)
}
}
}
if addr == "" {
log.Infof("For node %s can't find address using NodeInternalIP. Fallback to OVN annotation.", node.Name)

var ovnHostAddresses []string
if err := json.Unmarshal([]byte(node.Annotations["k8s.ovn.org/host-addresses"]), &ovnHostAddresses); err != nil {
log.Warnf("Couldn't unmarshall OVN annotations: '%s'. Skipping.", node.Annotations["k8s.ovn.org/host-addresses"])
}

AddrList:
for _, hostAddr := range ovnHostAddresses {
for _, filterIp := range filterIps {
if hostAddr == filterIp {
log.Infof("Address %s is VIP. Skipping.", hostAddr)
continue AddrList
}
}

if (utils.IsIPv4(net.ParseIP(hostAddr)) && !isFilterV4) || (utils.IsIPv6(net.ParseIP(hostAddr)) && !isFilterV6) {
log.Infof("Address %s doesn't match requested IP stack. Skipping.", hostAddr)
continue
}

match, err := utils.IpInCidr(hostAddr, machineNetwork)
if err != nil {
log.Infof("Address '%s' and subnet '%s' couldn't be parsed. Skipping.", hostAddr, machineNetwork)
continue
}
if match {
addr = hostAddr
log.Infof("For node %s selected peer address %s using using OVN annotations.", node.Name, addr)
}
}
}
return addr, nil
}

// Returns a Node object populated with the configuration specified by the parameters
// to the function.
// kubeconfigPath: The path to a kubeconfig that can be used to read cluster status
Expand Down Expand Up @@ -457,8 +542,7 @@ func getNodeConfig(kubeconfigPath, clusterConfigPath, resolvConfPath string, api

// getSortedBackends builds config to communicate with kube-api based on kubeconfigPath parameter value, if kubeconfigPath is not empty it will build the
// config based on that content else config will point to localhost.
func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, apiVip net.IP) (backends []Backend, err error) {

func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, vips []net.IP) (backends []Backend, err error) {
kubeApiServerUrl := ""
if readFromLocalAPI {
kubeApiServerUrl = localhostKubeApiServerUrl
Expand All @@ -483,19 +567,28 @@ func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, apiVip net.
}).Info("Failed to get master Nodes list")
return []Backend{}, err
}
apiVipv6 := utils.IsIPv6(apiVip)
if len(vips) == 0 {
return []Backend{}, fmt.Errorf("Trying to build config using empty VIPs")
}

// As it is not possible to get cluster's Machine Network directly, we are using a workaround
// by detecting which of the local interfaces belongs to the same subnet as requested VIP.
// This interface can be used to detect what was the original machine network as it contains
// the subnet mask that we need.
machineNetwork, err := utils.GetLocalCIDRByIP(vips[0].String())
if err != nil {
log.WithFields(logrus.Fields{
"err": err,
}).Errorf("Could not retrieve subnet for IP %s", vips[0].String())
return []Backend{}, err
}

for _, node := range nodes.Items {
masterIp := ""
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeInternalIP && utils.IsIPv6(net.ParseIP(address.Address)) == apiVipv6 {
masterIp = address.Address
break
}
}
if masterIp != "" {
backends = append(backends, Backend{Host: node.ObjectMeta.Name, Address: masterIp})
masterIp, err := getNodeIpForRequestedIpStack(node, utils.ConvertIpsToStrings(vips), machineNetwork)
if err != nil {
log.Warnf("Could not retrieve node's IP for %s. Ignoring", node.ObjectMeta.Name)
} else {
log.Warnf("Could not retrieve node's IP for %s", node.ObjectMeta.Name)
backends = append(backends, Backend{Host: node.ObjectMeta.Name, Address: masterIp})
}
}

Expand All @@ -505,23 +598,27 @@ func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, apiVip net.
return backends, err
}

func GetLBConfig(kubeconfigPath string, apiPort, lbPort, statPort uint16, apiVip net.IP) (ApiLBConfig, error) {
func GetLBConfig(kubeconfigPath string, apiPort, lbPort, statPort uint16, vips []net.IP) (ApiLBConfig, error) {
config := ApiLBConfig{
ApiPort: apiPort,
LbPort: lbPort,
StatPort: statPort,
}

if len(vips) == 0 {
return config, fmt.Errorf("Trying to generate loadbalancer config using empty VIPs")
}

// LB frontend address: IPv6 '::' , IPv4 ''
if apiVip.To4() == nil {
if utils.IsIPv6(vips[0]) {
config.FrontendAddr = "::"
}
// Try reading master nodes details first from api-vip:kube-apiserver and failover to localhost:kube-apiserver
backends, err := getSortedBackends(kubeconfigPath, false, apiVip)
backends, err := getSortedBackends(kubeconfigPath, false, vips)
if err != nil {
log.Infof("An error occurred while trying to read master nodes details from api-vip:kube-apiserver: %v", err)
log.Infof("Trying to read master nodes details from localhost:kube-apiserver")
backends, err = getSortedBackends(kubeconfigPath, true, apiVip)
backends, err = getSortedBackends(kubeconfigPath, true, vips)
if err != nil {
log.WithFields(logrus.Fields{
"kubeconfigPath": kubeconfigPath,
Expand Down
146 changes: 146 additions & 0 deletions pkg/config/node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package config

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
testOvnHostAddressesAnnotation = map[string]string{
"k8s.ovn.org/host-addresses": "[\"192.168.1.102\",\"192.168.1.99\",\"192.168.1.101\",\"fd00::101\",\"2001:db8::49a\",\"fd00::102\",\"fd00::5\",\"fd69::2\"]",
}

testNodeDualStack1 = v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "testNode"},
Status: v1.NodeStatus{Addresses: []v1.NodeAddress{
{Type: "InternalIP", Address: "192.168.1.99"},
{Type: "InternalIP", Address: "fd00::5"},
{Type: "ExternalIP", Address: "172.16.1.99"},
}}}
testNodeDualStack2 = v1.Node{

Status: v1.NodeStatus{Addresses: []v1.NodeAddress{
{Type: "InternalIP", Address: "192.168.1.99"},
{Type: "ExternalIP", Address: "172.16.1.99"},
}},
ObjectMeta: metav1.ObjectMeta{
Name: "testNode",
Annotations: testOvnHostAddressesAnnotation,
},
}
testNodeDualStack3 = v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "testNode",
Annotations: testOvnHostAddressesAnnotation,
},
}
testNodeSingleStackV4 = v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "testNode"},
Status: v1.NodeStatus{Addresses: []v1.NodeAddress{
{Type: "InternalIP", Address: "192.168.1.99"},
{Type: "ExternalIP", Address: "172.16.1.99"},
}}}
testNodeSingleStackV6 = v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "testNode"},
Status: v1.NodeStatus{Addresses: []v1.NodeAddress{
{Type: "InternalIP", Address: "fd00::5"},
{Type: "ExternalIP", Address: "2001:db8::49a"},
}}}

testMachineNetworkV4 = "192.168.1.0/24"
testMachineNetworkV6 = "fd00::5/64"
testApiVipV4 = "192.168.1.101"
testApiVipV6 = "fd00::101"
testIngressVipV4 = "192.168.1.102"
testIngressVipV6 = "fd00::102"
)

var _ = Describe("getNodePeersForIpStack", func() {
Context("for dual-stack node", func() {
Context("with address only in status", func() {
It("matches an IPv4 VIP", func() {
res, err := getNodeIpForRequestedIpStack(testNodeDualStack1, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4)
Expect(res).To(Equal("192.168.1.99"))
Expect(err).To(BeNil())
})
It("matches an IPv6 VIP", func() {
res, err := getNodeIpForRequestedIpStack(testNodeDualStack1, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6)
Expect(res).To(Equal("fd00::5"))
Expect(err).To(BeNil())
})
})

Context("with address only in OVN annotation", func() {
It("matches an IPv4 VIP", func() {
res, err := getNodeIpForRequestedIpStack(testNodeDualStack3, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4)
Expect(res).To(Equal("192.168.1.99"))
Expect(err).To(BeNil())
})
It("matches an IPv6 VIP", func() {
res, err := getNodeIpForRequestedIpStack(testNodeDualStack3, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6)
Expect(res).To(Equal("fd00::5"))
Expect(err).To(BeNil())
})
})

Context("with address in status and OVN annotation", func() {
It("matches an IPv4 VIP", func() {
res, err := getNodeIpForRequestedIpStack(testNodeDualStack2, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4)
Expect(res).To(Equal("192.168.1.99"))
Expect(err).To(BeNil())
})
It("matches an IPv6 VIP", func() {
res, err := getNodeIpForRequestedIpStack(testNodeDualStack2, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6)
Expect(res).To(Equal("fd00::5"))
Expect(err).To(BeNil())
})
})
})

Context("for single-stack v4 node", func() {
It("matches an IPv4 VIP", func() {
res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV4, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4)
Expect(res).To(Equal("192.168.1.99"))
Expect(err).To(BeNil())
})
It("empty for IPv6 VIP", func() {
res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV4, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6)
Expect(res).To(Equal(""))
Expect(err).To(BeNil())
})
})

Context("for single-stack v6 node", func() {
It("empty for IPv4 VIP", func() {
res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV6, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4)
Expect(res).To(Equal(""))
Expect(err).To(BeNil())
})
It("matches an IPv6 VIP", func() {
res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV6, []string{testApiVipV6, testIngressVipV6}, testMachineNetworkV6)
Expect(res).To(Equal("fd00::5"))
Expect(err).To(BeNil())
})
})

It("empty for empty node", func() {
res, err := getNodeIpForRequestedIpStack(v1.Node{}, []string{testApiVipV4, testIngressVipV4}, testMachineNetworkV4)
Expect(res).To(Equal(""))
Expect(err).To(BeNil())
})

It("empty for node with IPs and empty VIP requested", func() {
res, err := getNodeIpForRequestedIpStack(testNodeSingleStackV4, []string{}, testMachineNetworkV4)
Expect(res).To(Equal(""))
Expect(err.Error()).To(Equal("for node testNode requested NodeIP detection with empty filterIP list. Cannot detect IP stack"))
})
})

func Test(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Config tests")
}
Loading