From 24b7390a3f312153d2b7307c44e2096c791403b7 Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Fri, 25 Mar 2022 11:43:59 +0100 Subject: [PATCH] netpol: Add dual-stack support This change allows to define two cluster CIDRs for compatibility with Kubernetes dual-stack, with an assumption that two CIDRs are usually IPv4 and IPv6. Signed-off-by: Michal Rostecki --- docs/user-guide.md | 2 + pkg/cmd/kube-router.go | 37 +- .../netpol/network_policy_controller.go | 466 +++++++++++------- .../netpol/network_policy_controller_test.go | 338 ++++++++++--- pkg/controllers/netpol/pod.go | 288 ++++++----- pkg/controllers/netpol/policy.go | 328 +++++++----- pkg/controllers/netpol/utils.go | 57 ++- .../proxy/network_services_controller.go | 6 +- pkg/options/options.go | 5 + pkg/utils/ipset.go | 53 +- pkg/utils/ipset_test.go | 2 +- pkg/utils/iptables.go | 110 ++++- pkg/utils/node.go | 58 +++ pkg/utils/pod_cidr.go | 33 ++ 14 files changed, 1247 insertions(+), 536 deletions(-) diff --git a/docs/user-guide.md b/docs/user-guide.md index 9d27124c8..7015a51bd 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -56,6 +56,8 @@ Usage of kube-router: --disable-source-dest-check Disable the source-dest-check attribute for AWS EC2 instances. When this option is false, it must be set some other way. (default true) --enable-cni Enable CNI plugin. Disable if you want to use kube-router features alongside another CNI plugin. (default true) --enable-ibgp Enables peering with nodes with the same ASN, if disabled will only peer with external BGP peers (default true) + --enable-ipv4 Enables IPv4 support (default true) + --enable-ipv6 Enables IPv6 support (default true) --enable-overlay When enable-overlay is set to true, IP-in-IP tunneling is used for pod-to-pod networking across nodes in different subnets. When set to false no tunneling is used and routing infrastructure is expected to route traffic for pod-to-pod networking across nodes in different subnets (default true) --enable-pod-egress SNAT traffic from Pods to destinations outside the cluster. (default true) --enable-pprof Enables pprof for debugging performance and memory leak issues. diff --git a/pkg/cmd/kube-router.go b/pkg/cmd/kube-router.go index f66ba7742..f77d0501f 100644 --- a/pkg/cmd/kube-router.go +++ b/pkg/cmd/kube-router.go @@ -2,6 +2,7 @@ package cmd import ( "errors" + "fmt" "os" "os/signal" "sync" @@ -14,9 +15,12 @@ import ( "github.com/cloudnativelabs/kube-router/pkg/healthcheck" "github.com/cloudnativelabs/kube-router/pkg/metrics" "github.com/cloudnativelabs/kube-router/pkg/options" + "github.com/cloudnativelabs/kube-router/pkg/utils" "github.com/cloudnativelabs/kube-router/pkg/version" + "github.com/coreos/go-iptables/iptables" "k8s.io/klog/v2" + v1core "k8s.io/api/core/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -184,8 +188,39 @@ func (kr *KubeRouter) Run() error { } if kr.Config.RunFirewall { + iptablesCmdHandlers := make(map[v1core.IPFamily]utils.IPTablesHandler, 2) + ipSetHandlers := make(map[v1core.IPFamily]utils.IPSetHandler, 2) + + if kr.Config.EnableIPv4 { + iptHandler, err := iptables.NewWithProtocol(iptables.ProtocolIPv4) + if err != nil { + return fmt.Errorf("failed to create iptables handler: %w", err) + } + iptablesCmdHandlers[v1core.IPv4Protocol] = iptHandler + + ipset, err := utils.NewIPSet(false) + if err != nil { + return fmt.Errorf("failed to create ipset handler: %w", err) + } + ipSetHandlers[v1core.IPv4Protocol] = ipset + } + if kr.Config.EnableIPv6 { + iptHandler, err := iptables.NewWithProtocol(iptables.ProtocolIPv6) + if err != nil { + return fmt.Errorf("failed to create iptables handler: %w", err) + } + iptablesCmdHandlers[v1core.IPv6Protocol] = iptHandler + + ipset, err := utils.NewIPSet(true) + if err != nil { + return fmt.Errorf("failed to create ipset handler: %w", err) + } + ipSetHandlers[v1core.IPv6Protocol] = ipset + } + npc, err := netpol.NewNetworkPolicyController(kr.Client, - kr.Config, podInformer, npInformer, nsInformer, &ipsetMutex) + kr.Config, podInformer, npInformer, nsInformer, &ipsetMutex, + iptablesCmdHandlers, ipSetHandlers) if err != nil { return errors.New("Failed to create network policy controller: " + err.Error()) } diff --git a/pkg/controllers/netpol/network_policy_controller.go b/pkg/controllers/netpol/network_policy_controller.go index 2600fb333..a6acc1357 100644 --- a/pkg/controllers/netpol/network_policy_controller.go +++ b/pkg/controllers/netpol/network_policy_controller.go @@ -15,13 +15,14 @@ import ( "github.com/cloudnativelabs/kube-router/pkg/metrics" "github.com/cloudnativelabs/kube-router/pkg/options" "github.com/cloudnativelabs/kube-router/pkg/utils" - "github.com/coreos/go-iptables/iptables" "github.com/prometheus/client_golang/prometheus" "k8s.io/klog/v2" + v1core "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + netutils "k8s.io/utils/net" ) const ( @@ -62,19 +63,23 @@ var ( // NetworkPolicyController struct to hold information required by NetworkPolicyController type NetworkPolicyController struct { - nodeIP net.IP - nodeHostName string - serviceClusterIPRange net.IPNet - serviceExternalIPRanges []net.IPNet - serviceNodePortRange string - mu sync.Mutex - syncPeriod time.Duration - MetricsEnabled bool - healthChan chan<- *healthcheck.ControllerHeartbeat - fullSyncRequestChan chan struct{} - ipsetMutex *sync.Mutex - - ipSetHandler *utils.IPSet + nodeHostName string + primaryServiceClusterIPRange *net.IPNet + secondaryServiceClusterIPRange *net.IPNet + serviceExternalIPRanges []net.IPNet + serviceNodePortRange string + mu sync.Mutex + syncPeriod time.Duration + MetricsEnabled bool + healthChan chan<- *healthcheck.ControllerHeartbeat + fullSyncRequestChan chan struct{} + ipsetMutex *sync.Mutex + + iptablesCmdHandlers map[v1core.IPFamily]utils.IPTablesHandler + iptablesSaveRestore map[v1core.IPFamily]*utils.IPTablesSaveRestore + filterTableRules map[v1core.IPFamily]*bytes.Buffer + ipSetHandlers map[v1core.IPFamily]utils.IPSetHandler + nodeIPs map[v1core.IPFamily]net.IP podLister cache.Indexer npLister cache.Indexer @@ -83,8 +88,6 @@ type NetworkPolicyController struct { PodEventHandler cache.ResourceEventHandler NamespaceEventHandler cache.ResourceEventHandler NetworkPolicyEventHandler cache.ResourceEventHandler - - filterTableRules bytes.Buffer } // internal structure to represent a network policy @@ -108,7 +111,7 @@ type networkPolicyInfo struct { // internal structure to represent Pod type podInfo struct { - ip string + ips []v1core.PodIP name string namespace string labels map[string]string @@ -121,7 +124,7 @@ type ingressRule struct { namedPorts []endPoints matchAllSource bool srcPods []podInfo - srcIPBlocks [][]string + srcIPBlocks map[v1core.IPFamily][][]string } // internal structure to represent NetworkPolicyEgressRule in the spec @@ -131,7 +134,7 @@ type egressRule struct { namedPorts []endPoints matchAllDestinations bool dstPods []podInfo - dstIPBlocks [][]string + dstIPBlocks map[v1core.IPFamily][][]string } type protocolAndPort struct { @@ -141,7 +144,7 @@ type protocolAndPort struct { } type endPoints struct { - ips []string + ips map[v1core.IPFamily][]string protocolAndPort } @@ -248,15 +251,17 @@ func (npc *NetworkPolicyController) fullPolicySync() { return } - npc.filterTableRules.Reset() - if err := utils.SaveInto("filter", &npc.filterTableRules); err != nil { - klog.Errorf("Aborting sync. Failed to run iptables-save: %v" + err.Error()) - return + for ipFamily, iptablesSaveRestore := range npc.iptablesSaveRestore { + npc.filterTableRules[ipFamily].Reset() + if err := iptablesSaveRestore.SaveInto("filter", npc.filterTableRules[ipFamily]); err != nil { + klog.Errorf("Aborting sync. Failed to run iptables-save: %v", err.Error()) + return + } } activePolicyChains, activePolicyIPSets, err := npc.syncNetworkPolicyChains(networkPoliciesInfo, syncVersion) if err != nil { - klog.Errorf("Aborting sync. Failed to sync network policy chains: %v" + err.Error()) + klog.Errorf("Aborting sync. Failed to sync network policy chains: %v", err.Error()) return } @@ -272,10 +277,13 @@ func (npc *NetworkPolicyController) fullPolicySync() { return } - if err := utils.Restore("filter", npc.filterTableRules.Bytes()); err != nil { - klog.Errorf("Aborting sync. Failed to run iptables-restore: %v\n%s", - err.Error(), npc.filterTableRules.String()) - return + for ipFamily, iptablesSaveRestore := range npc.iptablesSaveRestore { + if err := iptablesSaveRestore.Restore("filter", + npc.filterTableRules[ipFamily].Bytes()); err != nil { + klog.Errorf("Aborting sync. Failed to run iptables-restore: %v\n%s", + err.Error(), npc.filterTableRules[ipFamily].String()) + return + } } err = npc.cleanupStaleIPSets(activePolicyIPSets) @@ -285,6 +293,17 @@ func (npc *NetworkPolicyController) fullPolicySync() { } } +func (npc *NetworkPolicyController) iptablesCmdHandlerForCIDR(cidr net.IPNet) (utils.IPTablesHandler, error) { + if netutils.IsIPv4CIDR(&cidr) { + return npc.iptablesCmdHandlers[v1core.IPv4Protocol], nil + } + if netutils.IsIPv6CIDR(&cidr) { + return npc.iptablesCmdHandlers[v1core.IPv6Protocol], nil + } + + return nil, fmt.Errorf("invalid CIDR") +} + // Creates custom chains KUBE-ROUTER-INPUT, KUBE-ROUTER-FORWARD, KUBE-ROUTER-OUTPUT // and following rules in the filter table to jump from builtin chain to custom chain // -A INPUT -m comment --comment "kube-router netpol" -j KUBE-ROUTER-INPUT @@ -296,11 +315,6 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { const whitelistUDPNodePortsPosition = 3 const externalIPPositionAdditive = 4 - iptablesCmdHandler, err := iptables.New() - if err != nil { - klog.Fatalf("Failed to initialize iptables executor due to %s", err.Error()) - } - addUUIDForRuleSpec := func(chain string, ruleSpec *[]string) (string, error) { hash := sha256.Sum256([]byte(chain + strings.Join(*ruleSpec, ""))) encoded := base32.StdEncoding.EncodeToString(hash[:])[:16] @@ -314,7 +328,8 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { strings.Join(*ruleSpec, " ")) } - ensureRuleAtPosition := func(chain string, ruleSpec []string, uuid string, position int) { + ensureRuleAtPosition := func(iptablesCmdHandler utils.IPTablesHandler, chain string, ruleSpec []string, + uuid string, position int) { exists, err := iptablesCmdHandler.Exists("filter", chain, ruleSpec...) if err != nil { klog.Fatalf("Failed to verify rule exists in %s chain due to %s", chain, err.Error()) @@ -358,101 +373,133 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { } } - for builtinChain, customChain := range defaultChains { - exists, err := iptablesCmdHandler.ChainExists("filter", customChain) - if err != nil { - klog.Fatalf("failed to check for the existence of chain %s, error: %v", customChain, err) - } - if !exists { - err = iptablesCmdHandler.NewChain("filter", customChain) + for _, iptablesCmdHandler := range npc.iptablesCmdHandlers { + for builtinChain, customChain := range defaultChains { + exists, err := iptablesCmdHandler.ChainExists("filter", customChain) if err != nil { klog.Fatalf("failed to run iptables command to create %s chain due to %s", customChain, err.Error()) } + if !exists { + err = iptablesCmdHandler.NewChain("filter", customChain) + if err != nil { + klog.Fatalf("failed to run iptables command to create %s chain due to %s", customChain, + err.Error()) + } + } + args := []string{"-m", "comment", "--comment", "kube-router netpol", "-j", customChain} + uuid, err := addUUIDForRuleSpec(builtinChain, &args) + if err != nil { + klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) + } + ensureRuleAtPosition(iptablesCmdHandler, builtinChain, args, uuid, 1) } - args := []string{"-m", "comment", "--comment", "kube-router netpol", "-j", customChain} - uuid, err := addUUIDForRuleSpec(builtinChain, &args) + } + + if npc.primaryServiceClusterIPRange != nil { + whitelistPrimaryServiceVips := []string{"-m", "comment", "--comment", "allow traffic to primary cluster IP range", + "-d", npc.primaryServiceClusterIPRange.String(), "-j", "RETURN"} + uuid, err := addUUIDForRuleSpec(kubeInputChainName, &whitelistPrimaryServiceVips) if err != nil { klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) } - ensureRuleAtPosition(builtinChain, args, uuid, 1) + iptablesCmdHandler, err := npc.iptablesCmdHandlerForCIDR(*npc.primaryServiceClusterIPRange) + if err != nil { + klog.Fatalf("Failed to get iptables handler: %s", err.Error()) + } + ensureRuleAtPosition(iptablesCmdHandler, kubeInputChainName, whitelistPrimaryServiceVips, uuid, serviceVIPPosition) + } else { + klog.Fatalf("Primary service cluster IP range is not configured") } - whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to cluster IP", "-d", - npc.serviceClusterIPRange.String(), "-j", "RETURN"} - uuid, err := addUUIDForRuleSpec(kubeInputChainName, &whitelistServiceVips) - if err != nil { - klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) + if npc.secondaryServiceClusterIPRange != nil { + whitelistSecondaryServiceVips := []string{"-m", "comment", "--comment", "allow traffic to secondary cluster IP range", + "-d", npc.secondaryServiceClusterIPRange.String(), "-j", "RETURN"} + uuid, err := addUUIDForRuleSpec(kubeInputChainName, &whitelistSecondaryServiceVips) + if err != nil { + klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) + } + iptablesCmdHandler, err := npc.iptablesCmdHandlerForCIDR(*npc.secondaryServiceClusterIPRange) + if err != nil { + klog.Fatalf("Failed to get iptables handler: %s", err.Error()) + } + ensureRuleAtPosition(iptablesCmdHandler, kubeInputChainName, whitelistSecondaryServiceVips, uuid, serviceVIPPosition) } - ensureRuleAtPosition(kubeInputChainName, whitelistServiceVips, uuid, serviceVIPPosition) - whitelistTCPNodeports := []string{"-p", "tcp", "-m", "comment", "--comment", - "allow LOCAL TCP traffic to node ports", "-m", "addrtype", "--dst-type", "LOCAL", - "-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"} - uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistTCPNodeports) - if err != nil { - klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) - } - ensureRuleAtPosition(kubeInputChainName, whitelistTCPNodeports, uuid, whitelistTCPNodePortsPosition) + for _, iptablesCmdHandler := range npc.iptablesCmdHandlers { + whitelistTCPNodeports := []string{"-p", "tcp", "-m", "comment", "--comment", + "allow LOCAL TCP traffic to node ports", "-m", "addrtype", "--dst-type", "LOCAL", + "-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"} + uuid, err := addUUIDForRuleSpec(kubeInputChainName, &whitelistTCPNodeports) + if err != nil { + klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) + } + ensureRuleAtPosition(iptablesCmdHandler, kubeInputChainName, whitelistTCPNodeports, uuid, + whitelistTCPNodePortsPosition) - whitelistUDPNodeports := []string{"-p", "udp", "-m", "comment", "--comment", - "allow LOCAL UDP traffic to node ports", "-m", "addrtype", "--dst-type", "LOCAL", - "-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"} - uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistUDPNodeports) - if err != nil { - klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) + whitelistUDPNodeports := []string{"-p", "udp", "-m", "comment", "--comment", + "allow LOCAL UDP traffic to node ports", "-m", "addrtype", "--dst-type", "LOCAL", + "-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"} + uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistUDPNodeports) + if err != nil { + klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) + } + ensureRuleAtPosition(iptablesCmdHandler, kubeInputChainName, whitelistUDPNodeports, uuid, + whitelistUDPNodePortsPosition) } - ensureRuleAtPosition(kubeInputChainName, whitelistUDPNodeports, uuid, whitelistUDPNodePortsPosition) for externalIPIndex, externalIPRange := range npc.serviceExternalIPRanges { whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to external IP range: " + externalIPRange.String(), "-d", externalIPRange.String(), "-j", "RETURN"} - uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistServiceVips) + uuid, err := addUUIDForRuleSpec(kubeInputChainName, &whitelistServiceVips) if err != nil { klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) } - ensureRuleAtPosition(kubeInputChainName, whitelistServiceVips, uuid, externalIPIndex+externalIPPositionAdditive) + iptablesCmdHandler, err := npc.iptablesCmdHandlerForCIDR(externalIPRange) + if err != nil { + klog.Fatalf("Failed to get iptables handler: %s", err.Error()) + } + ensureRuleAtPosition(iptablesCmdHandler, kubeInputChainName, whitelistServiceVips, uuid, + externalIPIndex+externalIPPositionAdditive) } } func (npc *NetworkPolicyController) ensureExplicitAccept() { // for the traffic to/from the local pod's let network policy controller be // authoritative entity to ACCEPT the traffic if it complies to network policies - for _, chain := range defaultChains { - args := []string{"-m", "comment", "--comment", "\"explicitly ACCEPT traffic that complies with network policies\"", - "-m", "mark", "--mark", "0x20000/0x20000", "-j", "ACCEPT"} - npc.filterTableRules = utils.AppendUnique(npc.filterTableRules, chain, args) + for _, filterTableRules := range npc.filterTableRules { + for _, chain := range defaultChains { + args := []string{"-m", "comment", "--comment", "\"explicitly ACCEPT traffic that complies with network policies\"", + "-m", "mark", "--mark", "0x20000/0x20000", "-j", "ACCEPT"} + utils.AppendUnique(filterTableRules, chain, args) + } } } // Creates custom chains KUBE-NWPLCY-DEFAULT func (npc *NetworkPolicyController) ensureDefaultNetworkPolicyChain() { + for _, iptablesCmdHandler := range npc.iptablesCmdHandlers { + markArgs := make([]string, 0) + markComment := "rule to mark traffic matching a network policy" + markArgs = append(markArgs, "-j", "MARK", "-m", "comment", "--comment", markComment, + "--set-xmark", "0x10000/0x10000") - iptablesCmdHandler, err := iptables.New() - if err != nil { - klog.Fatalf("Failed to initialize iptables executor due to %s", err.Error()) - } - - markArgs := make([]string, 0) - markComment := "rule to mark traffic matching a network policy" - markArgs = append(markArgs, "-j", "MARK", "-m", "comment", "--comment", markComment, - "--set-xmark", "0x10000/0x10000") - - exists, err := iptablesCmdHandler.ChainExists("filter", kubeDefaultNetpolChain) - if err != nil { - klog.Fatalf("failed to check for the existence of chain %s, error: %v", kubeDefaultNetpolChain, err) - } - if !exists { - err = iptablesCmdHandler.NewChain("filter", kubeDefaultNetpolChain) + exists, err := iptablesCmdHandler.ChainExists("filter", kubeDefaultNetpolChain) if err != nil { - klog.Fatalf("failed to run iptables command to create %s chain due to %s", - kubeDefaultNetpolChain, err.Error()) + klog.Fatalf("failed to check for the existence of chain %s, error: %v", kubeDefaultNetpolChain, err) + } + if !exists { + err = iptablesCmdHandler.NewChain("filter", kubeDefaultNetpolChain) + if err != nil { + klog.Fatalf("failed to run iptables command to create %s chain due to %s", + kubeDefaultNetpolChain, err.Error()) + } + } + err = iptablesCmdHandler.AppendUnique("filter", kubeDefaultNetpolChain, markArgs...) + if err != nil { + klog.Fatalf("Failed to run iptables command: %s", err.Error()) } - } - err = iptablesCmdHandler.AppendUnique("filter", kubeDefaultNetpolChain, markArgs...) - if err != nil { - klog.Fatalf("Failed to run iptables command: %s", err.Error()) } } @@ -462,88 +509,82 @@ func (npc *NetworkPolicyController) cleanupStaleRules(activePolicyChains, active cleanupPodFwChains := make([]string, 0) cleanupPolicyChains := make([]string, 0) - // initialize tool sets for working with iptables and ipset - iptablesCmdHandler, err := iptables.New() - if err != nil { - return fmt.Errorf("failed to initialize iptables command executor due to %s", err.Error()) - } - - // find iptables chains and ipsets that are no longer used by comparing current to the active maps we were passed - chains, err := iptablesCmdHandler.ListChains("filter") - if err != nil { - return fmt.Errorf("unable to list chains: %s", err) - } - for _, chain := range chains { - if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) { - if chain == kubeDefaultNetpolChain { - continue - } - if _, ok := activePolicyChains[chain]; !ok { - cleanupPolicyChains = append(cleanupPolicyChains, chain) - continue - } + for ipFamily, iptablesCmdHandler := range npc.iptablesCmdHandlers { + // find iptables chains and ipsets that are no longer used by comparing current to the active maps we were passed + chains, err := iptablesCmdHandler.ListChains("filter") + if err != nil { + return fmt.Errorf("unable to list chains: %w", err) } - if strings.HasPrefix(chain, kubePodFirewallChainPrefix) { - if _, ok := activePodFwChains[chain]; !ok { - cleanupPodFwChains = append(cleanupPodFwChains, chain) - continue + for _, chain := range chains { + if strings.HasPrefix(chain, kubeNetworkPolicyChainPrefix) { + if chain == kubeDefaultNetpolChain { + continue + } + if _, ok := activePolicyChains[chain]; !ok { + cleanupPolicyChains = append(cleanupPolicyChains, chain) + continue + } + } + if strings.HasPrefix(chain, kubePodFirewallChainPrefix) { + if _, ok := activePodFwChains[chain]; !ok { + cleanupPodFwChains = append(cleanupPodFwChains, chain) + continue + } } } - } - var newChains, newRules, desiredFilterTable bytes.Buffer - rules := strings.Split(npc.filterTableRules.String(), "\n") - if len(rules) > 0 && rules[len(rules)-1] == "" { - rules = rules[:len(rules)-1] - } - for _, rule := range rules { - skipRule := false - for _, podFWChainName := range cleanupPodFwChains { - if strings.Contains(rule, podFWChainName) { - skipRule = true - break - } + var newChains, newRules, desiredFilterTable bytes.Buffer + rules := strings.Split(npc.filterTableRules[ipFamily].String(), "\n") + if len(rules) > 0 && rules[len(rules)-1] == "" { + rules = rules[:len(rules)-1] } - for _, policyChainName := range cleanupPolicyChains { - if strings.Contains(rule, policyChainName) { - skipRule = true - break + for _, rule := range rules { + skipRule := false + for _, podFWChainName := range cleanupPodFwChains { + if strings.Contains(rule, podFWChainName) { + skipRule = true + break + } } - } - if deleteDefaultChains { - for _, chain := range []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName, - kubeDefaultNetpolChain} { - if strings.Contains(rule, chain) { + for _, policyChainName := range cleanupPolicyChains { + if strings.Contains(rule, policyChainName) { skipRule = true break } } + if deleteDefaultChains { + for _, chain := range []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName, + kubeDefaultNetpolChain} { + if strings.Contains(rule, chain) { + skipRule = true + break + } + } + } + if strings.Contains(rule, "COMMIT") || strings.HasPrefix(rule, "# ") { + skipRule = true + } + if skipRule { + continue + } + if strings.HasPrefix(rule, ":") { + newChains.WriteString(rule + " - [0:0]\n") + } + if strings.HasPrefix(rule, "-") { + newRules.WriteString(rule + "\n") + } } - if strings.Contains(rule, "COMMIT") || strings.HasPrefix(rule, "# ") { - skipRule = true - } - if skipRule { - continue - } - if strings.HasPrefix(rule, ":") { - newChains.WriteString(rule + " - [0:0]\n") - } - if strings.HasPrefix(rule, "-") { - newRules.WriteString(rule + "\n") - } + desiredFilterTable.WriteString("*filter" + "\n") + desiredFilterTable.Write(newChains.Bytes()) + desiredFilterTable.Write(newRules.Bytes()) + desiredFilterTable.WriteString("COMMIT" + "\n") + npc.filterTableRules[ipFamily] = &desiredFilterTable } - desiredFilterTable.WriteString("*filter" + "\n") - desiredFilterTable.Write(newChains.Bytes()) - desiredFilterTable.Write(newRules.Bytes()) - desiredFilterTable.WriteString("COMMIT" + "\n") - npc.filterTableRules = desiredFilterTable return nil } func (npc *NetworkPolicyController) cleanupStaleIPSets(activePolicyIPSets map[string]bool) error { - cleanupPolicyIPSets := make([]*utils.Set, 0) - // There are certain actions like Cleanup() actions that aren't working with full instantiations of the controller // and in these instances the mutex may not be present and may not need to be present as they are operating out of a // single goroutine where there is no need for locking @@ -557,27 +598,25 @@ func (npc *NetworkPolicyController) cleanupStaleIPSets(activePolicyIPSets map[st }() } - ipsets, err := utils.NewIPSet(false) - if err != nil { - return fmt.Errorf("failed to create ipsets command executor due to %s", err.Error()) - } - err = ipsets.Save() - if err != nil { - klog.Fatalf("failed to initialize ipsets command executor due to %s", err.Error()) - } - for _, set := range ipsets.Sets { - if strings.HasPrefix(set.Name, kubeSourceIPSetPrefix) || - strings.HasPrefix(set.Name, kubeDestinationIPSetPrefix) { - if _, ok := activePolicyIPSets[set.Name]; !ok { - cleanupPolicyIPSets = append(cleanupPolicyIPSets, set) + for _, ipsets := range npc.ipSetHandlers { + cleanupPolicyIPSets := make([]*utils.Set, 0) + + if err := ipsets.Save(); err != nil { + klog.Fatalf("failed to initialize ipsets command executor due to %s", err.Error()) + } + for _, set := range ipsets.Sets() { + if strings.HasPrefix(set.Name, kubeSourceIPSetPrefix) || + strings.HasPrefix(set.Name, kubeDestinationIPSetPrefix) { + if _, ok := activePolicyIPSets[set.Name]; !ok { + cleanupPolicyIPSets = append(cleanupPolicyIPSets, set) + } } } - } - // cleanup network policy ipsets - for _, set := range cleanupPolicyIPSets { - err = set.Destroy() - if err != nil { - return fmt.Errorf("failed to delete ipset %s due to %s", set.Name, err) + // cleanup network policy ipsets + for _, set := range cleanupPolicyIPSets { + if err := set.Destroy(); err != nil { + return fmt.Errorf("failed to delete ipset %s due to %s", set.Name, err) + } } } return nil @@ -589,9 +628,11 @@ func (npc *NetworkPolicyController) Cleanup() { var emptySet map[string]bool // Take a dump (iptables-save) of the current filter table for cleanupStaleRules() to work on - if err := utils.SaveInto("filter", &npc.filterTableRules); err != nil { - klog.Errorf("error encountered attempting to list iptables rules for cleanup: %v", err) - return + for ipFamily, iptablesSaveRestore := range npc.iptablesSaveRestore { + if err := iptablesSaveRestore.SaveInto("filter", npc.filterTableRules[ipFamily]); err != nil { + klog.Errorf("error encountered attempting to list iptables rules for cleanup: %v", err) + return + } } // Run cleanupStaleRules() to get rid of most of the kube-router rules (this is the same logic that runs as // part NPC's runtime loop). Setting the last parameter to true causes even the default chains are removed. @@ -601,10 +642,12 @@ func (npc *NetworkPolicyController) Cleanup() { return } // Restore (iptables-restore) npc's cleaned up version of the iptables filter chain - if err = utils.Restore("filter", npc.filterTableRules.Bytes()); err != nil { - klog.Errorf( - "error encountered while loading running iptables-restore: %v\n%s", err, - npc.filterTableRules.String()) + for ipFamily, iptablesSaveRestore := range npc.iptablesSaveRestore { + if err = iptablesSaveRestore.Restore("filter", npc.filterTableRules[ipFamily].Bytes()); err != nil { + klog.Errorf( + "error encountered while loading running iptables-restore: %v\n%s", err, + npc.filterTableRules[ipFamily].String()) + } } // Cleanup ipsets @@ -621,7 +664,9 @@ func (npc *NetworkPolicyController) Cleanup() { func NewNetworkPolicyController(clientset kubernetes.Interface, config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer, npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer, - ipsetMutex *sync.Mutex) (*NetworkPolicyController, error) { + ipsetMutex *sync.Mutex, + iptablesCmdHandlers map[v1core.IPFamily]utils.IPTablesHandler, + ipSetHandlers map[v1core.IPFamily]utils.IPSetHandler) (*NetworkPolicyController, error) { npc := NetworkPolicyController{ipsetMutex: ipsetMutex} // Creating a single-item buffered channel to ensure that we only keep a single full sync request at a time, @@ -630,11 +675,32 @@ func NewNetworkPolicyController(clientset kubernetes.Interface, npc.fullSyncRequestChan = make(chan struct{}, 1) // Validate and parse ClusterIP service range - _, ipnet, err := net.ParseCIDR(config.ClusterIPCIDR) + if config.ClusterIPCIDR == "" { + return nil, fmt.Errorf("parameter --service-cluster-ip is empty") + } + clusterIPCIDRList := strings.Split(config.ClusterIPCIDR, ",") + + if len(clusterIPCIDRList) == 0 { + return nil, fmt.Errorf("failed to get parse --service-cluster-ip-range parameter, the list is empty") + } + + _, primaryIpnet, err := net.ParseCIDR(clusterIPCIDRList[0]) if err != nil { - return nil, fmt.Errorf("failed to get parse --service-cluster-ip-range parameter: %s", err.Error()) + return nil, fmt.Errorf("failed to get parse --service-cluster-ip-range parameter: %w", err) + } + npc.primaryServiceClusterIPRange = primaryIpnet + + if len(clusterIPCIDRList) > 1 { + _, secondaryIpnet, err := net.ParseCIDR(clusterIPCIDRList[1]) + if err != nil { + return nil, fmt.Errorf("failed to get parse --service-cluster-ip-range parameter: %v", err) + } + npc.secondaryServiceClusterIPRange = secondaryIpnet + } + if len(clusterIPCIDRList) > 2 { + return nil, fmt.Errorf("too many CIDRs provided in --service-cluster-ip-range parameter, only two " + + "addresses are allowed at once for dual-stack") } - npc.serviceClusterIPRange = *ipnet // Validate and parse NodePort range if npc.serviceNodePortRange, err = validateNodePortRange(config.NodePortRange); err != nil { @@ -667,11 +733,29 @@ func NewNetworkPolicyController(clientset kubernetes.Interface, npc.nodeHostName = node.Name - nodeIP, err := utils.GetNodeIP(node) + nodeIPv4, nodeIPv6, err := utils.GetNodeIPDualStack(node, config.EnableIPv4, config.EnableIPv6) if err != nil { return nil, err } - npc.nodeIP = nodeIP + + npc.iptablesCmdHandlers = iptablesCmdHandlers + npc.iptablesSaveRestore = make(map[v1core.IPFamily]*utils.IPTablesSaveRestore, 2) + npc.filterTableRules = make(map[v1core.IPFamily]*bytes.Buffer, 2) + npc.ipSetHandlers = ipSetHandlers + npc.nodeIPs = make(map[v1core.IPFamily]net.IP, 2) + + if config.EnableIPv4 { + npc.iptablesSaveRestore[v1core.IPv4Protocol] = utils.NewIPTablesSaveRestore(v1core.IPv4Protocol) + var buf bytes.Buffer + npc.filterTableRules[v1core.IPv4Protocol] = &buf + npc.nodeIPs[v1core.IPv4Protocol] = nodeIPv4 + } + if config.EnableIPv6 { + npc.iptablesSaveRestore[v1core.IPv6Protocol] = utils.NewIPTablesSaveRestore(v1core.IPv6Protocol) + var buf bytes.Buffer + npc.filterTableRules[v1core.IPv6Protocol] = &buf + npc.nodeIPs[v1core.IPv6Protocol] = nodeIPv6 + } npc.podLister = podInformer.GetIndexer() npc.PodEventHandler = npc.newPodEventHandler() diff --git a/pkg/controllers/netpol/network_policy_controller_test.go b/pkg/controllers/netpol/network_policy_controller_test.go index 9deab8b91..849d8692c 100644 --- a/pkg/controllers/netpol/network_policy_controller_test.go +++ b/pkg/controllers/netpol/network_policy_controller_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/coreos/go-iptables/iptables" + netv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/intstr" @@ -23,6 +25,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "github.com/cloudnativelabs/kube-router/pkg/options" + "github.com/cloudnativelabs/kube-router/pkg/utils" ) // newFakeInformersFromClient creates the different informers used in the uneventful network policy controller @@ -138,14 +141,14 @@ func tNewPodNamespaceMapFromTC(target map[string]string) tPodNamespaceMap { func tCreateFakePods(t *testing.T, podInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) { podNamespaceMap := make(tPodNamespaceMap) pods := []podInfo{ - {name: "Aa", labels: labels.Set{"app": "a"}, namespace: "nsA", ip: "1.1"}, - {name: "Aaa", labels: labels.Set{"app": "a", "component": "a"}, namespace: "nsA", ip: "1.2"}, - {name: "Aab", labels: labels.Set{"app": "a", "component": "b"}, namespace: "nsA", ip: "1.3"}, - {name: "Aac", labels: labels.Set{"app": "a", "component": "c"}, namespace: "nsA", ip: "1.4"}, - {name: "Ba", labels: labels.Set{"app": "a"}, namespace: "nsB", ip: "2.1"}, - {name: "Baa", labels: labels.Set{"app": "a", "component": "a"}, namespace: "nsB", ip: "2.2"}, - {name: "Bab", labels: labels.Set{"app": "a", "component": "b"}, namespace: "nsB", ip: "2.3"}, - {name: "Ca", labels: labels.Set{"app": "a"}, namespace: "nsC", ip: "3.1"}, + {name: "Aa", labels: labels.Set{"app": "a"}, namespace: "nsA", ips: []v1.PodIP{{IP: "1.1.1.1"}, {IP: "2001:cafe:42:1::1"}}}, + {name: "Aaa", labels: labels.Set{"app": "a", "component": "a"}, namespace: "nsA", ips: []v1.PodIP{{IP: "1.2.3.4"}, {IP: "2001:cafe:42:1::2"}}}, + {name: "Aab", labels: labels.Set{"app": "a", "component": "b"}, namespace: "nsA", ips: []v1.PodIP{{IP: "1.3.2.2"}, {IP: "2001:cafe:42:1::3"}}}, + {name: "Aac", labels: labels.Set{"app": "a", "component": "c"}, namespace: "nsA", ips: []v1.PodIP{{IP: "1.4.2.2"}, {IP: "2001:cafe:42:1::4"}}}, + {name: "Ba", labels: labels.Set{"app": "a"}, namespace: "nsB", ips: []v1.PodIP{{IP: "2.1.1.1"}, {IP: "2001:cafe:42:2::1"}}}, + {name: "Baa", labels: labels.Set{"app": "a", "component": "a"}, namespace: "nsB", ips: []v1.PodIP{{IP: "2.2.2.2"}, {IP: "2001:cafe:42:2::2"}}}, + {name: "Bab", labels: labels.Set{"app": "a", "component": "b"}, namespace: "nsB", ips: []v1.PodIP{{IP: "2.3.2.2"}, {IP: "2001:cafe:42:2::3"}}}, + {name: "Ca", labels: labels.Set{"app": "a"}, namespace: "nsC", ips: []v1.PodIP{{IP: "3.1"}, {IP: "2001::1"}}}, } namespaces := []tNamespaceMeta{ {name: "nsA", labels: labels.Set{"name": "a", "team": "a"}}, @@ -156,15 +159,16 @@ func tCreateFakePods(t *testing.T, podInformer cache.SharedIndexInformer, nsInfo ipsUsed := make(map[string]bool) for _, pod := range pods { podNamespaceMap.addPod(pod) - ipaddr := "1.1." + pod.ip - if ipsUsed[ipaddr] { - t.Fatalf("there is another pod with the same Ip address %s as this pod %s namespace %s", - ipaddr, pod.name, pod.name) + for _, podIP := range pod.ips { + if ipsUsed[podIP.IP] { + t.Fatalf("there is another pod with the same Ip address %s as this pod %s namespace %s", + podIP, pod.name, pod.name) + } + ipsUsed[podIP.IP] = true + tAddToInformerStore(t, podInformer, + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pod.name, Labels: pod.labels, Namespace: pod.namespace}, + Status: v1.PodStatus{PodIP: podIP.IP}}) } - ipsUsed[ipaddr] = true - tAddToInformerStore(t, podInformer, - &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pod.name, Labels: pod.labels, Namespace: pod.namespace}, - Status: v1.PodStatus{PodIP: ipaddr}}) } for _, ns := range namespaces { tAddToInformerStore(t, nsInformer, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns.name, Labels: ns.labels}}) @@ -172,7 +176,11 @@ func tCreateFakePods(t *testing.T, podInformer cache.SharedIndexInformer, nsInfo } // newFakeNode is a helper function for creating Nodes for testing. -func newFakeNode(name string, addr string) *v1.Node { +func newFakeNode(name string, addrs []string) *v1.Node { + addresses := make([]v1.NodeAddress, len(addrs)) + for _, addr := range addrs { + addresses = append(addresses, v1.NodeAddress{Type: v1.NodeInternalIP, Address: addr}) + } return &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: name}, Status: v1.NodeStatus{ @@ -180,7 +188,7 @@ func newFakeNode(name string, addr string) *v1.Node { v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1G"), }, - Addresses: []v1.NodeAddress{{Type: v1.NodeExternalIP, Address: addr}}, + Addresses: addresses, }, } } @@ -192,8 +200,27 @@ func newUneventfulNetworkPolicyController(podInformer cache.SharedIndexInformer, npc := NetworkPolicyController{} npc.syncPeriod = time.Hour + npc.iptablesCmdHandlers = make(map[v1.IPFamily]utils.IPTablesHandler) + npc.iptablesSaveRestore = make(map[v1.IPFamily]*utils.IPTablesSaveRestore) + npc.filterTableRules = make(map[v1.IPFamily]*bytes.Buffer) + npc.ipSetHandlers = make(map[v1.IPFamily]utils.IPSetHandler) + npc.nodeIPs = make(map[v1.IPFamily]net.IP) + + npc.iptablesCmdHandlers[v1.IPv4Protocol] = newFakeIPTables(iptables.ProtocolIPv4) + npc.iptablesSaveRestore[v1.IPv4Protocol] = utils.NewIPTablesSaveRestore(v1.IPv4Protocol) + var bufv4 bytes.Buffer + npc.filterTableRules[v1.IPv4Protocol] = &bufv4 + npc.ipSetHandlers[v1.IPv4Protocol] = &fakeIPSet{} + npc.nodeIPs[v1.IPv4Protocol] = net.IPv4(10, 10, 10, 10) + + npc.iptablesCmdHandlers[v1.IPv6Protocol] = newFakeIPTables(iptables.ProtocolIPv6) + npc.iptablesSaveRestore[v1.IPv6Protocol] = utils.NewIPTablesSaveRestore(v1.IPv6Protocol) + var bufv6 bytes.Buffer + npc.filterTableRules[v1.IPv6Protocol] = &bufv6 + npc.ipSetHandlers[v1.IPv6Protocol] = &fakeIPSet{} + npc.nodeIPs[v1.IPv6Protocol] = net.ParseIP("2001:1b74:88:9400::62:62") + npc.nodeHostName = "node" - npc.nodeIP = net.IPv4(10, 10, 10, 10) npc.podLister = podInformer.GetIndexer() npc.nsLister = nsInformer.GetIndexer() npc.npLister = npInformer.GetIndexer() @@ -205,12 +232,13 @@ func newUneventfulNetworkPolicyController(podInformer cache.SharedIndexInformer, // the expected selected targets (targetPods, inSourcePods for ingress targets, and outDestPods // for egress targets) as maps with key being the namespace and a csv of pod names type tNetpolTestCase struct { - name string - netpol tNetpol - targetPods tPodNamespaceMap - inSourcePods tPodNamespaceMap - outDestPods tPodNamespaceMap - expectedRule string + name string + netpol tNetpol + targetPods tPodNamespaceMap + inSourcePods tPodNamespaceMap + outDestPods tPodNamespaceMap + expectedRuleV4 string + expectedRuleV6 string } // tGetNotTargetedPods finds set of pods that should not be targeted by netpol selectors @@ -369,7 +397,8 @@ func TestNewNetworkPolicySelectors(t *testing.T) { }, } - client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*newFakeNode("node", "10.10.10.10")}}) + client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*newFakeNode("node", []string{"10.10.10.10", + "2001:1b74:88:9400::62:62"})}}) informerFactory, podInformer, nsInformer, netpolInformer := newFakeInformersFromClient(client) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -382,7 +411,7 @@ func TestNewNetworkPolicySelectors(t *testing.T) { } netpols, err := krNetPol.buildNetworkPoliciesInfo() if err != nil { - t.Errorf("Problems building policies") + t.Errorf("Problems building policies: %v", err) } for _, test := range testCases { @@ -426,8 +455,10 @@ func TestNetworkPolicyBuilder(t *testing.T) { }, }, }, - expectedRule: "-A KUBE-NWPLCY-QHFGOTFJZFXUJVTH -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress namespace nsA\" --dport 30000 -j MARK --set-xmark 0x10000/0x10000 \n" + - "-A KUBE-NWPLCY-QHFGOTFJZFXUJVTH -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress namespace nsA\" --dport 30000 -m mark --mark 0x10000/0x10000 -j RETURN \n", + expectedRuleV4: "-A KUBE-NWPLCY-C23KD7UE4TAT3Y5M -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress namespace nsA\" --dport 30000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-C23KD7UE4TAT3Y5M -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress namespace nsA\" --dport 30000 -m mark --mark 0x10000/0x10000 -j RETURN \n", + expectedRuleV6: "-A KUBE-NWPLCY-YQCH3LHZFXYDUDPO -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress namespace nsA\" --dport 30000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-YQCH3LHZFXYDUDPO -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress namespace nsA\" --dport 30000 -m mark --mark 0x10000/0x10000 -j RETURN \n", }, { name: "Simple Ingress/Egress Destination Port", @@ -460,10 +491,14 @@ func TestNetworkPolicyBuilder(t *testing.T) { }, }, }, - expectedRule: "-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-ingress-egress namespace nsA\" --dport 30000 -j MARK --set-xmark 0x10000/0x10000 \n" + - "-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-ingress-egress namespace nsA\" --dport 30000 -m mark --mark 0x10000/0x10000 -j RETURN \n" + - "-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from all sources to dest pods selected by policy name: simple-ingress-egress namespace nsA\" --dport 37000 -j MARK --set-xmark 0x10000/0x10000 \n" + - "-A KUBE-NWPLCY-KO52PWL34ABMMBI7 -m comment --comment \"rule to ACCEPT traffic from all sources to dest pods selected by policy name: simple-ingress-egress namespace nsA\" --dport 37000 -m mark --mark 0x10000/0x10000 -j RETURN \n", + expectedRuleV4: "-A KUBE-NWPLCY-IDIX352DRLNY3D23 -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-ingress-egress namespace nsA\" --dport 30000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-IDIX352DRLNY3D23 -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-ingress-egress namespace nsA\" --dport 30000 -m mark --mark 0x10000/0x10000 -j RETURN \n" + + "-A KUBE-NWPLCY-IDIX352DRLNY3D23 -m comment --comment \"rule to ACCEPT traffic from all sources to dest pods selected by policy name: simple-ingress-egress namespace nsA\" --dport 37000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-IDIX352DRLNY3D23 -m comment --comment \"rule to ACCEPT traffic from all sources to dest pods selected by policy name: simple-ingress-egress namespace nsA\" --dport 37000 -m mark --mark 0x10000/0x10000 -j RETURN \n", + expectedRuleV6: "-A KUBE-NWPLCY-X6HAEA5FXW4H6SCU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-ingress-egress namespace nsA\" --dport 30000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-X6HAEA5FXW4H6SCU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-ingress-egress namespace nsA\" --dport 30000 -m mark --mark 0x10000/0x10000 -j RETURN \n" + + "-A KUBE-NWPLCY-X6HAEA5FXW4H6SCU -m comment --comment \"rule to ACCEPT traffic from all sources to dest pods selected by policy name: simple-ingress-egress namespace nsA\" --dport 37000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-X6HAEA5FXW4H6SCU -m comment --comment \"rule to ACCEPT traffic from all sources to dest pods selected by policy name: simple-ingress-egress namespace nsA\" --dport 37000 -m mark --mark 0x10000/0x10000 -j RETURN \n", }, { name: "Simple Egress Destination Port Range", @@ -492,10 +527,14 @@ func TestNetworkPolicyBuilder(t *testing.T) { }, }, }, - expectedRule: "-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 30000:31000 -j MARK --set-xmark 0x10000/0x10000 \n" + - "-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 30000:31000 -m mark --mark 0x10000/0x10000 -j RETURN \n" + - "-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 34000:35000 -j MARK --set-xmark 0x10000/0x10000 \n" + - "-A KUBE-NWPLCY-SQYQ7PVNG6A6Q3DU -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 34000:35000 -m mark --mark 0x10000/0x10000 -j RETURN \n", + expectedRuleV4: "-A KUBE-NWPLCY-2UTXQIFBI5TAPUCL -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 30000:31000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-2UTXQIFBI5TAPUCL -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 30000:31000 -m mark --mark 0x10000/0x10000 -j RETURN \n" + + "-A KUBE-NWPLCY-2UTXQIFBI5TAPUCL -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 34000:35000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-2UTXQIFBI5TAPUCL -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 34000:35000 -m mark --mark 0x10000/0x10000 -j RETURN \n", + expectedRuleV6: "-A KUBE-NWPLCY-33RFHPW3DDYFHOGH -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 30000:31000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-33RFHPW3DDYFHOGH -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 30000:31000 -m mark --mark 0x10000/0x10000 -j RETURN \n" + + "-A KUBE-NWPLCY-33RFHPW3DDYFHOGH -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 34000:35000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-33RFHPW3DDYFHOGH -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: simple-egress-pr namespace nsA\" --dport 34000:35000 -m mark --mark 0x10000/0x10000 -j RETURN \n", }, { name: "Port > EndPort (invalid condition, should drop endport)", @@ -520,12 +559,15 @@ func TestNetworkPolicyBuilder(t *testing.T) { }, }, }, - expectedRule: "-A KUBE-NWPLCY-2A4DPWPR5REBS66I -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: invalid-endport namespace nsA\" --dport 34000 -j MARK --set-xmark 0x10000/0x10000 \n" + - "-A KUBE-NWPLCY-2A4DPWPR5REBS66I -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: invalid-endport namespace nsA\" --dport 34000 -m mark --mark 0x10000/0x10000 -j RETURN \n", + expectedRuleV4: "-A KUBE-NWPLCY-N5DQE4SCQ56JEMH7 -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: invalid-endport namespace nsA\" --dport 34000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-N5DQE4SCQ56JEMH7 -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: invalid-endport namespace nsA\" --dport 34000 -m mark --mark 0x10000/0x10000 -j RETURN \n", + expectedRuleV6: "-A KUBE-NWPLCY-SE73PD4VXO44WFJC -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: invalid-endport namespace nsA\" --dport 34000 -j MARK --set-xmark 0x10000/0x10000 \n" + + "-A KUBE-NWPLCY-SE73PD4VXO44WFJC -m comment --comment \"rule to ACCEPT traffic from source pods to all destinations selected by policy name: invalid-endport namespace nsA\" --dport 34000 -m mark --mark 0x10000/0x10000 -j RETURN \n", }, } - client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*newFakeNode("node", "10.10.10.10")}}) + client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*newFakeNode("node", []string{ + "10.10.10.10", "2001:1b74:88:9400::62:62"})}}) informerFactory, podInformer, nsInformer, netpolInformer := newFakeInformersFromClient(client) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -541,22 +583,29 @@ func TestNetworkPolicyBuilder(t *testing.T) { } for _, np := range netpols { fmt.Printf(np.policyType) - if np.policyType == kubeEgressPolicyType || np.policyType == kubeBothPolicyType { - err = krNetPol.processEgressRules(np, "", nil, "1") - if err != nil { - t.Errorf("Error syncing the rules: %s", err) + for ipFamily := range krNetPol.filterTableRules { + if np.policyType == kubeEgressPolicyType || np.policyType == kubeBothPolicyType { + err = krNetPol.processEgressRules(np, "", nil, "1", ipFamily) + if err != nil { + t.Errorf("Error syncing the rules: %s", err) + } } - } - if np.policyType == kubeIngressPolicyType || np.policyType == kubeBothPolicyType { - err = krNetPol.processIngressRules(np, "", nil, "1") - if err != nil { - t.Errorf("Error syncing the rules: %s", err) + if np.policyType == kubeIngressPolicyType || np.policyType == kubeBothPolicyType { + err = krNetPol.processIngressRules(np, "", nil, "1", ipFamily) + if err != nil { + t.Errorf("Error syncing the rules: %s", err) + } } } } - if !bytes.Equal([]byte(test.expectedRule), krNetPol.filterTableRules.Bytes()) { - t.Errorf("Invalid rule %s created:\nExpected:\n%s \nGot:\n%s", test.name, test.expectedRule, krNetPol.filterTableRules.String()) + if !bytes.Equal([]byte(test.expectedRuleV4), krNetPol.filterTableRules[v1.IPv4Protocol].Bytes()) { + t.Errorf("Invalid IPv4 rule %s created:\nExpected:\n%s \nGot:\n%s", + test.name, test.expectedRuleV4, krNetPol.filterTableRules[v1.IPv4Protocol].String()) + } + if !bytes.Equal([]byte(test.expectedRuleV6), krNetPol.filterTableRules[v1.IPv6Protocol].Bytes()) { + t.Errorf("Invalid IPv6 rule %s created:\nExpected:\n%s \nGot:\n%s", + test.name, test.expectedRuleV6, krNetPol.filterTableRules[v1.IPv6Protocol].String()) } key := fmt.Sprintf("%s/%s", test.netpol.namespace, test.netpol.name) obj, exists, err := krNetPol.npLister.GetByKey(key) @@ -569,10 +618,154 @@ func TestNetworkPolicyBuilder(t *testing.T) { t.Errorf("Failed to remove Netpol from store: %s", err) } } - krNetPol.filterTableRules.Reset() - + for _, filterTableRules := range krNetPol.filterTableRules { + filterTableRules.Reset() + } } +} + +type fakeIPTables struct { + protocol iptables.Protocol +} + +func newFakeIPTables(protocol iptables.Protocol) *fakeIPTables { + return &fakeIPTables{protocol} +} + +func (ipt *fakeIPTables) Proto() iptables.Protocol { + return ipt.protocol +} + +func (ipt *fakeIPTables) Exists(table, chain string, rulespec ...string) (bool, error) { + return true, nil +} + +func (ipt *fakeIPTables) Insert(table, chain string, pos int, rulespec ...string) error { + return nil +} + +func (ipt *fakeIPTables) Append(table, chain string, rulespec ...string) error { + return nil +} + +func (ipt *fakeIPTables) AppendUnique(table, chain string, rulespec ...string) error { + return nil +} + +func (ipt *fakeIPTables) Delete(table, chain string, rulespec ...string) error { + return nil +} + +func (ipt *fakeIPTables) DeleteIfExists(table, chain string, rulespec ...string) error { + return nil +} + +func (ipt *fakeIPTables) List(table, chain string) ([]string, error) { + return nil, nil +} + +func (ipt *fakeIPTables) ListWithCounters(table, chain string) ([]string, error) { + return nil, nil +} + +func (ipt *fakeIPTables) ListChains(table string) ([]string, error) { + return nil, nil +} + +func (ipt *fakeIPTables) ChainExists(table, chain string) (bool, error) { + return true, nil +} + +func (ipt *fakeIPTables) Stats(table, chain string) ([][]string, error) { + return nil, nil +} + +func (ipt *fakeIPTables) ParseStat(stat []string) (iptables.Stat, error) { + return iptables.Stat{}, nil +} + +func (ipt *fakeIPTables) StructuredStats(table, chain string) ([]iptables.Stat, error) { + return nil, nil +} + +func (ipt *fakeIPTables) NewChain(table, chain string) error { + return nil +} + +func (ipt *fakeIPTables) ClearChain(table, chain string) error { + return nil +} + +func (ipt *fakeIPTables) RenameChain(table, oldChain, newChain string) error { + return nil +} + +func (ipt *fakeIPTables) DeleteChain(table, chain string) error { + return nil +} + +func (ipt *fakeIPTables) ClearAndDeleteChain(table, chain string) error { + return nil +} + +func (ipt *fakeIPTables) ClearAll() error { + return nil +} + +func (ipt *fakeIPTables) DeleteAll() error { + return nil +} + +func (ipt *fakeIPTables) ChangePolicy(table, chain, target string) error { + return nil +} + +func (ipt *fakeIPTables) HasRandomFully() bool { + return true +} +func (ipt *fakeIPTables) GetIptablesVersion() (int, int, int) { + return 1, 8, 0 +} + +type fakeIPSet struct{} + +func (ips *fakeIPSet) Create(setName string, createOptions ...string) (*utils.Set, error) { + return nil, nil +} + +func (ips *fakeIPSet) Add(set *utils.Set) error { + return nil +} + +func (ips *fakeIPSet) RefreshSet(setName string, entriesWithOptions [][]string, setType string) {} + +func (ips *fakeIPSet) Destroy(setName string) error { + return nil +} + +func (ips *fakeIPSet) DestroyAllWithin() error { + return nil +} + +func (ips *fakeIPSet) Save() error { + return nil +} + +func (ips *fakeIPSet) Restore() error { + return nil +} + +func (ips *fakeIPSet) Flush() error { + return nil +} + +func (ips *fakeIPSet) Get(setName string) *utils.Set { + return nil +} + +func (ips *fakeIPSet) Sets() map[string]*utils.Set { + return nil } func TestNetworkPolicyController(t *testing.T) { @@ -607,6 +800,18 @@ func TestNetworkPolicyController(t *testing.T) { true, "failed to get parse --service-cluster-ip-range parameter: invalid CIDR address: 10.10.10.10", }, + { + "Test bad cluster CIDRs (using more than 2 ip addresses, including 2 ipv4)", + newMinimalKubeRouterConfig("10.96.0.0/12,10.244.0.0/16,2001:db8:42:1::/112", "", "node", nil), + true, + "too many CIDRs provided in --service-cluster-ip-range parameter, only two addresses are allowed at once for dual-stack", + }, + { + "Test bad cluster CIDRs (using more than 2 ip addresses, including 2 ipv6)", + newMinimalKubeRouterConfig("10.96.0.0/12,2001:db8:42:0::/56,2001:db8:42:1::/112", "", "node", nil), + true, + "too many CIDRs provided in --service-cluster-ip-range parameter, only two addresses are allowed at once for dual-stack", + }, { "Test good cluster CIDR (using single IP with a /32)", newMinimalKubeRouterConfig("10.10.10.10/32", "", "node", nil), @@ -619,6 +824,18 @@ func TestNetworkPolicyController(t *testing.T) { false, "", }, + { + "Test good cluster CIDR (using ipv6)", + newMinimalKubeRouterConfig("2001:db8:42:1::/112", "", "node", nil), + false, + "", + }, + { + "Test good cluster CIDRs (with dual-stack)", + newMinimalKubeRouterConfig("10.96.0.0/12,2001:db8:42:1::/112", "", "node", nil), + false, + "", + }, { "Test bad node port specification (using commas)", newMinimalKubeRouterConfig("", "8080,8081", "node", nil), @@ -698,11 +915,22 @@ func TestNetworkPolicyController(t *testing.T) { "", }, } - client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*newFakeNode("node", "10.10.10.10")}}) + client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*newFakeNode("node", []string{ + "10.10.10.10", + "2001:1b74:88:9400::62:62"})}}) _, podInformer, nsInformer, netpolInformer := newFakeInformersFromClient(client) for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - _, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer, &sync.Mutex{}) + // TODO: Handle IPv6 + iptablesHandlers := make(map[v1.IPFamily]utils.IPTablesHandler, 2) + iptablesHandlers[v1.IPv4Protocol] = newFakeIPTables(iptables.ProtocolIPv4) + iptablesHandlers[v1.IPv6Protocol] = newFakeIPTables(iptables.ProtocolIPv6) + ipSetHandlers := make(map[v1.IPFamily]utils.IPSetHandler, 2) + ipSetHandlers[v1.IPv4Protocol] = &fakeIPSet{} + ipSetHandlers[v1.IPv6Protocol] = &fakeIPSet{} + + _, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer, &sync.Mutex{}, + iptablesHandlers, ipSetHandlers) if err == nil && test.expectError { t.Error("This config should have failed, but it was successful instead") } else if err != nil { diff --git a/pkg/controllers/netpol/pod.go b/pkg/controllers/netpol/pod.go index 63043346a..cde621d98 100644 --- a/pkg/controllers/netpol/pod.go +++ b/pkg/controllers/netpol/pod.go @@ -79,36 +79,44 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo [] activePodFwChains := make(map[string]bool) dropUnmarkedTrafficRules := func(podName, podNamespace, podFwChainName string) { - // add rule to log the packets that will be dropped due to network policy enforcement - comment := "\"rule to log dropped traffic POD name:" + podName + " namespace: " + podNamespace + "\"" - args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, - "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "NFLOG", - "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10", "\n"} - // This used to be AppendUnique when we were using iptables directly, this checks to make sure we didn't drop - // unmarked for this chain already - if strings.Contains(npc.filterTableRules.String(), strings.Join(args, " ")) { - return - } - npc.filterTableRules.WriteString(strings.Join(args, " ")) + for _, filterTableRules := range npc.filterTableRules { + // add rule to log the packets that will be dropped due to network policy enforcement + comment := "\"rule to log dropped traffic POD name:" + podName + " namespace: " + podNamespace + "\"" + args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, + "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "NFLOG", + "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10", "\n"} + // This used to be AppendUnique when we were using iptables directly, this checks to make sure we didn't drop + // unmarked for this chain already + if strings.Contains(filterTableRules.String(), strings.Join(args, " ")) { + return + } + filterTableRules.WriteString(strings.Join(args, " ")) - // add rule to DROP if no applicable network policy permits the traffic - comment = "\"rule to REJECT traffic destined for POD name:" + podName + " namespace: " + podNamespace + "\"" - args = []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, - "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "REJECT", "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) + // add rule to DROP if no applicable network policy permits the traffic + comment = "\"rule to REJECT traffic destined for POD name:" + podName + " namespace: " + podNamespace + "\"" + args = []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, + "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "REJECT", "\n"} + filterTableRules.WriteString(strings.Join(args, " ")) - // reset mark to let traffic pass through rest of the chains - args = []string{"-A", podFwChainName, "-j", "MARK", "--set-mark", "0/0x10000", "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) + // reset mark to let traffic pass through rest of the chains + args = []string{"-A", podFwChainName, "-j", "MARK", "--set-mark", "0/0x10000", "\n"} + filterTableRules.WriteString(strings.Join(args, " ")) + } } // loop through the pods running on the node - allLocalPods := npc.getLocalPods(npc.nodeIP.String()) - for _, pod := range *allLocalPods { + allLocalPods := make(map[string]podInfo) + for _, nodeIP := range npc.nodeIPs { + npc.getLocalPods(allLocalPods, nodeIP.String()) + break + } + for _, pod := range allLocalPods { // ensure pod specific firewall chain exist for all the pods that need ingress firewall podFwChainName := podFirewallChainName(pod.namespace, pod.name, version) - npc.filterTableRules.WriteString(":" + podFwChainName + "\n") + for _, filterTableRules := range npc.filterTableRules { + filterTableRules.WriteString(":" + podFwChainName + "\n") + } activePodFwChains[podFwChainName] = true @@ -123,12 +131,14 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo [] dropUnmarkedTrafficRules(pod.name, pod.namespace, podFwChainName) - // set mark to indicate traffic from/to the pod passed network policies. - // Mark will be checked to explicitly ACCEPT the traffic - comment := "\"set mark to ACCEPT traffic that comply to network policies\"" - args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, - "-j", "MARK", "--set-mark", "0x20000/0x20000", "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) + for _, filterTableRules := range npc.filterTableRules { + // set mark to indicate traffic from/to the pod passed network policies. + // Mark will be checked to explicitly ACCEPT the traffic + comment := "\"set mark to ACCEPT traffic that comply to network policies\"" + args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, + "-j", "MARK", "--set-mark", "0x20000/0x20000", "\n"} + filterTableRules.WriteString(strings.Join(args, " ")) + } } return activePodFwChains @@ -141,138 +151,154 @@ func (npc *NetworkPolicyController) setupPodNetpolRules(pod podInfo, podFwChainN hasIngressPolicy := false hasEgressPolicy := false - // add entries in pod firewall to run through applicable network policies - for _, policy := range networkPoliciesInfo { - if _, ok := policy.targetPods[pod.ip]; !ok { - continue + for ipFamily, filterTableRules := range npc.filterTableRules { + var ip string + switch ipFamily { + case api.IPv4Protocol: + ip, _ = getPodIPv4Address(pod) + case api.IPv6Protocol: + ip, _ = getPodIPv6Address(pod) } - comment := "\"run through nw policy " + policy.name + "\"" - policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) - var args []string - switch policy.policyType { - case kubeBothPolicyType: - hasIngressPolicy = true - hasEgressPolicy = true - args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, - "-j", policyChainName, "\n"} - case kubeIngressPolicyType: - hasIngressPolicy = true - args = []string{"-I", podFwChainName, "1", "-d", pod.ip, "-m", "comment", "--comment", comment, - "-j", policyChainName, "\n"} - case kubeEgressPolicyType: - hasEgressPolicy = true - args = []string{"-I", podFwChainName, "1", "-s", pod.ip, "-m", "comment", "--comment", comment, - "-j", policyChainName, "\n"} + // add entries in pod firewall to run through applicable network policies + for _, policy := range networkPoliciesInfo { + // TODO: Take the ipv4 address, pod.ips[0] is not good + if _, ok := policy.targetPods[pod.ips[0].IP]; !ok { + continue + } + comment := "\"run through nw policy " + policy.name + "\"" + policyChainName := networkPolicyChainName(policy.namespace, policy.name, version, ipFamily) + var args []string + switch policy.policyType { + case kubeBothPolicyType: + hasIngressPolicy = true + hasEgressPolicy = true + args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, + "-j", policyChainName, "\n"} + case kubeIngressPolicyType: + hasIngressPolicy = true + args = []string{"-I", podFwChainName, "1", "-d", ip, "-m", "comment", "--comment", comment, + "-j", policyChainName, "\n"} + case kubeEgressPolicyType: + hasEgressPolicy = true + args = []string{"-I", podFwChainName, "1", "-s", ip, "-m", "comment", "--comment", comment, + "-j", policyChainName, "\n"} + } + filterTableRules.WriteString(strings.Join(args, " ")) } - npc.filterTableRules.WriteString(strings.Join(args, " ")) - } - // if pod does not have any network policy which applies rules for pod's ingress traffic - // then apply default network policy - if !hasIngressPolicy { - comment := "\"run through default ingress network policy chain\"" - args := []string{"-I", podFwChainName, "1", "-d", pod.ip, "-m", "comment", "--comment", comment, - "-j", kubeDefaultNetpolChain, "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) - } + // if pod does not have any network policy which applies rules for pod's ingress traffic + // then apply default network policy + if !hasIngressPolicy { + comment := "\"run through default ingress network policy chain\"" + args := []string{"-I", podFwChainName, "1", "-d", ip, "-m", "comment", "--comment", comment, + "-j", kubeDefaultNetpolChain, "\n"} + filterTableRules.WriteString(strings.Join(args, " ")) + } - // if pod does not have any network policy which applies rules for pod's egress traffic - // then apply default network policy - if !hasEgressPolicy { - comment := "\"run through default egress network policy chain\"" - args := []string{"-I", podFwChainName, "1", "-s", pod.ip, "-m", "comment", "--comment", comment, - "-j", kubeDefaultNetpolChain, "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) - } + // if pod does not have any network policy which applies rules for pod's egress traffic + // then apply default network policy + if !hasEgressPolicy { + comment := "\"run through default egress network policy chain\"" + args := []string{"-I", podFwChainName, "1", "-s", ip, "-m", "comment", "--comment", comment, + "-j", kubeDefaultNetpolChain, "\n"} + filterTableRules.WriteString(strings.Join(args, " ")) + } - comment := "\"rule to permit the traffic to pods when source is the pod's local node\"" - args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, - "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT", "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) - - // ensure statefull firewall drops INVALID state traffic from/to the pod - // For full context see: https://bugzilla.netfilter.org/show_bug.cgi?id=693 - // The NAT engine ignores any packet with state INVALID, because there's no reliable way to determine what kind of - // NAT should be performed. So the proper way to prevent the leakage is to drop INVALID packets. - // In the future, if we ever allow services or nodes to disable conntrack checking, we may need to make this - // conditional so that non-tracked traffic doesn't get dropped as invalid. - comment = "\"rule to drop invalid state for pod\"" - args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, - "-m", "conntrack", "--ctstate", "INVALID", "-j", "DROP", "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) - - // ensure statefull firewall that permits RELATED,ESTABLISHED traffic from/to the pod - comment = "\"rule for stateful firewall for pod\"" - args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, - "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) + comment := "\"rule to permit the traffic traffic to pods when source is the pod's local node\"" + args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, + "-m", "addrtype", "--src-type", "LOCAL", "-d", ip, "-j", "ACCEPT", "\n"} + filterTableRules.WriteString(strings.Join(args, " ")) + // ensure statefull firewall that permits RELATED,ESTABLISHED traffic from/to the pod + comment = "\"rule for stateful firewall for pod\"" + args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, + "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", "\n"} + filterTableRules.WriteString(strings.Join(args, " ")) + } } func (npc *NetworkPolicyController) interceptPodInboundTraffic(pod podInfo, podFwChainName string) { - // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain - // this rule applies to the traffic getting routed (coming for other node pods) - comment := "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + - " to chain " + podFwChainName + "\"" - args := []string{"-A", kubeForwardChainName, "-m", "comment", "--comment", comment, "-d", pod.ip, - "-j", podFwChainName + "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) - - // ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain - // this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy - args = []string{"-A", kubeOutputChainName, "-m", "comment", "--comment", comment, "-d", pod.ip, - "-j", podFwChainName + "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) - - // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain - // this rule applies to the traffic getting switched (coming for same node pods) - comment = "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + - " to chain " + podFwChainName + "\"" - args = []string{"-A", kubeForwardChainName, "-m", "physdev", "--physdev-is-bridged", - "-m", "comment", "--comment", comment, - "-d", pod.ip, - "-j", podFwChainName, "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) + for ipFamily, filterTableRules := range npc.filterTableRules { + var ip string + switch ipFamily { + case api.IPv4Protocol: + ip, _ = getPodIPv4Address(pod) + case api.IPv6Protocol: + ip, _ = getPodIPv6Address(pod) + } + + // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain + // this rule applies to the traffic getting routed (coming for other node pods) + comment := "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + "\"" + args := []string{"-A", kubeForwardChainName, "-m", "comment", "--comment", comment, "-d", ip, + "-j", podFwChainName + "\n"} + filterTableRules.WriteString(strings.Join(args, " ")) + + // ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain + // this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy + args = []string{"-A", kubeOutputChainName, "-m", "comment", "--comment", comment, "-d", ip, + "-j", podFwChainName + "\n"} + filterTableRules.WriteString(strings.Join(args, " ")) + + // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain + // this rule applies to the traffic getting switched (coming for same node pods) + comment = "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + "\"" + args = []string{"-A", kubeForwardChainName, "-m", "physdev", "--physdev-is-bridged", + "-m", "comment", "--comment", comment, + "-d", ip, + "-j", podFwChainName, "\n"} + filterTableRules.WriteString(strings.Join(args, " ")) + } } // setup iptable rules to intercept outbound traffic from pods and run it across the // firewall chain corresponding to the pod so that egress network policies are enforced func (npc *NetworkPolicyController) interceptPodOutboundTraffic(pod podInfo, podFwChainName string) { - for _, chain := range defaultChains { - // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain - // this rule applies to the traffic getting forwarded/routed (traffic from the pod destined - // to pod on a different node) + for ipFamily, filterTableRules := range npc.filterTableRules { + var ip string + switch ipFamily { + case api.IPv4Protocol: + ip, _ = getPodIPv4Address(pod) + case api.IPv6Protocol: + ip, _ = getPodIPv6Address(pod) + } + + for _, chain := range defaultChains { + // ensure there is rule in filter table and FORWARD chain to jump to pod specific firewall chain + // this rule applies to the traffic getting forwarded/routed (traffic from the pod destined + // to pod on a different node) + comment := "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + + " to chain " + podFwChainName + "\"" + args := []string{"-A", chain, "-m", "comment", "--comment", comment, "-s", ip, "-j", podFwChainName, "\n"} + filterTableRules.WriteString(strings.Join(args, " ")) + } + + // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain + // this rule applies to the traffic getting switched (coming for same node pods) comment := "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + " to chain " + podFwChainName + "\"" - args := []string{"-A", chain, "-m", "comment", "--comment", comment, "-s", pod.ip, "-j", podFwChainName, "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) + args := []string{"-A", kubeForwardChainName, "-m", "physdev", "--physdev-is-bridged", + "-m", "comment", "--comment", comment, + "-s", ip, + "-j", podFwChainName, "\n"} + filterTableRules.WriteString(strings.Join(args, " ")) } - - // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain - // this rule applies to the traffic getting switched (coming for same node pods) - comment := "\"rule to jump traffic from POD name:" + pod.name + " namespace: " + pod.namespace + - " to chain " + podFwChainName + "\"" - args := []string{"-A", kubeForwardChainName, "-m", "physdev", "--physdev-is-bridged", - "-m", "comment", "--comment", comment, - "-s", pod.ip, - "-j", podFwChainName, "\n"} - npc.filterTableRules.WriteString(strings.Join(args, " ")) } -func (npc *NetworkPolicyController) getLocalPods(nodeIP string) *map[string]podInfo { - localPods := make(map[string]podInfo) +func (npc *NetworkPolicyController) getLocalPods(localPods map[string]podInfo, nodeIP string) { for _, obj := range npc.podLister.List() { pod := obj.(*api.Pod) // ignore the pods running on the different node and pods that are not actionable if strings.Compare(pod.Status.HostIP, nodeIP) != 0 || !isNetPolActionable(pod) { continue } - localPods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP, + localPods[pod.Status.PodIP] = podInfo{ips: pod.Status.PodIPs, name: pod.ObjectMeta.Name, namespace: pod.ObjectMeta.Namespace, labels: pod.ObjectMeta.Labels} } - return &localPods } func podFirewallChainName(namespace, podName string, version string) string { diff --git a/pkg/controllers/netpol/policy.go b/pkg/controllers/netpol/policy.go index aed41b86a..b34d624b1 100644 --- a/pkg/controllers/netpol/policy.go +++ b/pkg/controllers/netpol/policy.go @@ -19,6 +19,7 @@ import ( listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + utilsnet "k8s.io/utils/net" ) func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler { @@ -85,60 +86,69 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo klog.V(1).Infof("Returned ipset mutex lock") }() - ipset, err := utils.NewIPSet(false) - if err != nil { - return nil, nil, err - } - err = ipset.Save() - if err != nil { - return nil, nil, err - } - npc.ipSetHandler = ipset - activePolicyChains := make(map[string]bool) activePolicyIPSets := make(map[string]bool) + // for ipFamily, ipset := range npc.ipSetHandlers { // run through all network policies for _, policy := range networkPoliciesInfo { - // ensure there is a unique chain per network policy in filter table - policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) - npc.filterTableRules.WriteString(":" + policyChainName + "\n") + currentPodIPs := make(map[api.IPFamily][]string) + for _, pod := range policy.targetPods { + for _, ip := range pod.ips { + if utilsnet.IsIPv4String(ip.IP) { + currentPodIPs[api.IPv4Protocol] = append(currentPodIPs[api.IPv4Protocol], ip.IP) + } + if utilsnet.IsIPv6String(ip.IP) { + currentPodIPs[api.IPv6Protocol] = append(currentPodIPs[api.IPv6Protocol], ip.IP) + } + } + } + + for ipFamily, ipset := range npc.ipSetHandlers { + // ensure there is a unique chain per network policy in filter table + policyChainName := networkPolicyChainName(policy.namespace, policy.name, version, ipFamily) - activePolicyChains[policyChainName] = true + npc.filterTableRules[ipFamily].WriteString(":" + policyChainName + "\n") - currentPodIPs := make([]string, 0, len(policy.targetPods)) - for ip := range policy.targetPods { - currentPodIPs = append(currentPodIPs, ip) - } + activePolicyChains[policyChainName] = true - if policy.policyType == kubeBothPolicyType || policy.policyType == kubeIngressPolicyType { - // create a ipset for all destination pod ip's matched by the policy spec PodSelector - targetDestPodIPSetName := policyDestinationPodIPSetName(policy.namespace, policy.name) - npc.createGenericHashIPSet(targetDestPodIPSetName, utils.TypeHashIP, currentPodIPs) - err = npc.processIngressRules(policy, targetDestPodIPSetName, activePolicyIPSets, version) - if err != nil { - return nil, nil, err + if policy.policyType == kubeBothPolicyType || policy.policyType == kubeIngressPolicyType { + // create a ipset for all destination pod ip's matched by the policy spec PodSelector + targetDestPodIPSetName := policyDestinationPodIPSetName(policy.namespace, policy.name, ipFamily) + setEntries := make([][]string, 0) + for _, podIP := range currentPodIPs[ipFamily] { + setEntries = append(setEntries, []string{podIP, utils.OptionTimeout, "0"}) + } + ipset.RefreshSet(targetDestPodIPSetName, setEntries, utils.TypeHashIP) + if err := npc.processIngressRules(policy, targetDestPodIPSetName, activePolicyIPSets, + version, ipFamily); err != nil { + return nil, nil, err + } + activePolicyIPSets[targetDestPodIPSetName] = true + } + if policy.policyType == kubeBothPolicyType || policy.policyType == kubeEgressPolicyType { + // create a ipset for all source pod ip's matched by the policy spec PodSelector + targetSourcePodIPSetName := policySourcePodIPSetName(policy.namespace, policy.name, ipFamily) + setEntries := make([][]string, 0) + for _, podIP := range currentPodIPs[ipFamily] { + setEntries = append(setEntries, []string{podIP, utils.OptionTimeout, "0"}) + } + ipset.RefreshSet(targetSourcePodIPSetName, setEntries, utils.TypeHashIP) + if err := npc.processEgressRules(policy, targetSourcePodIPSetName, activePolicyIPSets, + version, ipFamily); err != nil { + return nil, nil, err + } + activePolicyIPSets[targetSourcePodIPSetName] = true } - activePolicyIPSets[targetDestPodIPSetName] = true - } - if policy.policyType == kubeBothPolicyType || policy.policyType == kubeEgressPolicyType { - // create a ipset for all source pod ip's matched by the policy spec PodSelector - targetSourcePodIPSetName := policySourcePodIPSetName(policy.namespace, policy.name) - npc.createGenericHashIPSet(targetSourcePodIPSetName, utils.TypeHashIP, currentPodIPs) - err = npc.processEgressRules(policy, targetSourcePodIPSetName, activePolicyIPSets, version) + + err := ipset.Restore() if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to perform ipset restore: %w", err) } - activePolicyIPSets[targetSourcePodIPSetName] = true } } - err = npc.ipSetHandler.Restore() - if err != nil { - return nil, nil, fmt.Errorf("failed to perform ipset restore: %s", err.Error()) - } - klog.V(2).Infof("Iptables chains in the filter table are synchronized with the network policies.") return activePolicyChains, activePolicyIPSets, nil @@ -146,7 +156,8 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo //nolint:dupl // This is as simple as this function gets even though it repeats some of processEgressRules func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo, - targetDestPodIPSetName string, activePolicyIPSets map[string]bool, version string) error { + targetDestPodIPSetName string, activePolicyIPSets map[string]bool, version string, + ipFamily api.IPFamily) error { // From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic " // so no whitelist rules to be added to the network policy @@ -154,23 +165,23 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo return nil } - policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) + policyChainName := networkPolicyChainName(policy.namespace, policy.name, version, ipFamily) // run through all the ingress rules in the spec and create iptables rules // in the chain for the network policy for ruleIdx, ingressRule := range policy.ingressRules { if len(ingressRule.srcPods) != 0 { - srcPodIPSetName := policyIndexedSourcePodIPSetName(policy.namespace, policy.name, ruleIdx) + srcPodIPSetName := policyIndexedSourcePodIPSetName(policy.namespace, policy.name, ruleIdx, ipFamily) // Create policy based ipset with source pod IPs npc.createPolicyIndexedIPSet(activePolicyIPSets, srcPodIPSetName, utils.TypeHashIP, - getIPsFromPods(ingressRule.srcPods)) + getIPsFromPods(ingressRule.srcPods, ipFamily), ipFamily) // If the ingress policy contains port declarations, we need to make sure that we match on pod IP and port if len(ingressRule.ports) != 0 { if err := npc.createPodWithPortPolicyRule(ingressRule.ports, policy, policyChainName, - srcPodIPSetName, targetDestPodIPSetName); err != nil { + srcPodIPSetName, targetDestPodIPSetName, ipFamily); err != nil { return err } } @@ -180,13 +191,13 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo if len(ingressRule.namedPorts) != 0 { for portIdx, eps := range ingressRule.namedPorts { namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, ruleIdx, - portIdx) - npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashIP, eps.ips) + portIdx, ipFamily) + npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashIP, eps.ips[ipFamily], ipFamily) comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, namedPortIPSetName, - eps.protocol, eps.port, eps.endport); err != nil { + eps.protocol, eps.port, eps.endport, ipFamily); err != nil { return err } } @@ -199,7 +210,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, - "", "", ""); err != nil { + "", "", "", ipFamily); err != nil { return err } } @@ -212,20 +223,20 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", targetDestPodIPSetName, - portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { + portProtocol.protocol, portProtocol.port, portProtocol.endport, ipFamily); err != nil { return err } } for portIdx, eps := range ingressRule.namedPorts { namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, ruleIdx, - portIdx) - npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashIP, eps.ips) + portIdx, ipFamily) + npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashIP, eps.ips[ipFamily], ipFamily) comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", namedPortIPSetName, - eps.protocol, eps.port, eps.endport); err != nil { + eps.protocol, eps.port, eps.endport, ipFamily); err != nil { return err } } @@ -237,15 +248,15 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", targetDestPodIPSetName, - "", "", ""); err != nil { + "", "", "", ipFamily); err != nil { return err } } if len(ingressRule.srcIPBlocks) != 0 { - srcIPBlockIPSetName := policyIndexedSourceIPBlockIPSetName(policy.namespace, policy.name, ruleIdx) + srcIPBlockIPSetName := policyIndexedSourceIPBlockIPSetName(policy.namespace, policy.name, ruleIdx, ipFamily) activePolicyIPSets[srcIPBlockIPSetName] = true - npc.ipSetHandler.RefreshSet(srcIPBlockIPSetName, ingressRule.srcIPBlocks, utils.TypeHashNet) + npc.ipSetHandlers[ipFamily].RefreshSet(srcIPBlockIPSetName, ingressRule.srcIPBlocks[ipFamily], utils.TypeHashNet) if !ingressRule.matchAllPorts { for _, portProtocol := range ingressRule.ports { @@ -253,20 +264,21 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, - portProtocol.endport); err != nil { + portProtocol.endport, ipFamily); err != nil { return err } } for portIdx, eps := range ingressRule.namedPorts { namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, ruleIdx, - portIdx) - npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashNet, eps.ips) + portIdx, ipFamily) + npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashNet, + eps.ips[ipFamily], ipFamily) comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, - namedPortIPSetName, eps.protocol, eps.port, eps.endport); err != nil { + namedPortIPSetName, eps.protocol, eps.port, eps.endport, ipFamily); err != nil { return err } } @@ -275,7 +287,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, - targetDestPodIPSetName, "", "", ""); err != nil { + targetDestPodIPSetName, "", "", "", ipFamily); err != nil { return err } } @@ -287,7 +299,8 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo //nolint:dupl // This is as simple as this function gets even though it repeats some of ProcessIngressRules func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, - targetSourcePodIPSetName string, activePolicyIPSets map[string]bool, version string) error { + targetSourcePodIPSetName string, activePolicyIPSets map[string]bool, version string, + ipFamily api.IPFamily) error { // From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic " // so no whitelist rules to be added to the network policy @@ -295,23 +308,23 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, return nil } - policyChainName := networkPolicyChainName(policy.namespace, policy.name, version) + policyChainName := networkPolicyChainName(policy.namespace, policy.name, version, ipFamily) // run through all the egress rules in the spec and create iptables rules // in the chain for the network policy for ruleIdx, egressRule := range policy.egressRules { if len(egressRule.dstPods) != 0 { - dstPodIPSetName := policyIndexedDestinationPodIPSetName(policy.namespace, policy.name, ruleIdx) + dstPodIPSetName := policyIndexedDestinationPodIPSetName(policy.namespace, policy.name, ruleIdx, ipFamily) // Create policy based ipset with destination pod IPs npc.createPolicyIndexedIPSet(activePolicyIPSets, dstPodIPSetName, utils.TypeHashIP, - getIPsFromPods(egressRule.dstPods)) + getIPsFromPods(egressRule.dstPods, ipFamily), ipFamily) // If the egress policy contains port declarations, we need to make sure that we match on pod IP and port if len(egressRule.ports) != 0 { if err := npc.createPodWithPortPolicyRule(egressRule.ports, policy, policyChainName, - targetSourcePodIPSetName, dstPodIPSetName); err != nil { + targetSourcePodIPSetName, dstPodIPSetName, ipFamily); err != nil { return err } } @@ -321,13 +334,13 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, if len(egressRule.namedPorts) != 0 { for portIdx, eps := range egressRule.namedPorts { namedPortIPSetName := policyIndexedEgressNamedPortIPSetName(policy.namespace, policy.name, ruleIdx, - portIdx) - npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashIP, eps.ips) + portIdx, ipFamily) + npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashIP, eps.ips[ipFamily], ipFamily) comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, - namedPortIPSetName, eps.protocol, eps.port, eps.endport); err != nil { + namedPortIPSetName, eps.protocol, eps.port, eps.endport, ipFamily); err != nil { return err } } @@ -340,7 +353,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, - dstPodIPSetName, "", "", ""); err != nil { + dstPodIPSetName, "", "", "", ipFamily); err != nil { return err } } @@ -353,7 +366,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, - "", portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { + "", portProtocol.protocol, portProtocol.port, portProtocol.endport, ipFamily); err != nil { return err } } @@ -361,7 +374,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, - "", portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { + "", portProtocol.protocol, portProtocol.port, portProtocol.endport, ipFamily); err != nil { return err } } @@ -373,22 +386,22 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, - "", "", "", ""); err != nil { + "", "", "", "", ipFamily); err != nil { return err } } if len(egressRule.dstIPBlocks) != 0 { - dstIPBlockIPSetName := policyIndexedDestinationIPBlockIPSetName(policy.namespace, policy.name, ruleIdx) + dstIPBlockIPSetName := policyIndexedDestinationIPBlockIPSetName(policy.namespace, policy.name, ruleIdx, ipFamily) activePolicyIPSets[dstIPBlockIPSetName] = true - npc.ipSetHandler.RefreshSet(dstIPBlockIPSetName, egressRule.dstIPBlocks, utils.TypeHashNet) + npc.ipSetHandlers[ipFamily].RefreshSet(dstIPBlockIPSetName, egressRule.dstIPBlocks[ipFamily], utils.TypeHashNet) if !egressRule.matchAllPorts { for _, portProtocol := range egressRule.ports { comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, portProtocol.protocol, portProtocol.port, - portProtocol.endport); err != nil { + portProtocol.endport, ipFamily); err != nil { return err } } @@ -397,7 +410,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, - dstIPBlockIPSetName, "", "", ""); err != nil { + dstIPBlockIPSetName, "", "", "", ipFamily); err != nil { return err } } @@ -407,7 +420,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, } func (npc *NetworkPolicyController) appendRuleToPolicyChain(policyChainName, comment, srcIPSetName, dstIPSetName, - protocol, dPort, endDport string) error { + protocol, dPort, endDport string, ipFamily api.IPFamily) error { args := make([]string, 0) args = append(args, "-A", policyChainName) @@ -435,10 +448,10 @@ func (npc *NetworkPolicyController) appendRuleToPolicyChain(policyChainName, com // nolint:gocritic // we want to append to a separate array here so that we can re-use args below markArgs := append(args, "-j", "MARK", "--set-xmark", "0x10000/0x10000", "\n") - npc.filterTableRules.WriteString(strings.Join(markArgs, " ")) + npc.filterTableRules[ipFamily].WriteString(strings.Join(markArgs, " ")) args = append(args, "-m", "mark", "--mark", "0x10000/0x10000", "-j", "RETURN", "\n") - npc.filterTableRules.WriteString(strings.Join(args, " ")) + npc.filterTableRules[ipFamily].WriteString(strings.Join(args, " ")) return nil } @@ -487,7 +500,7 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI if !isNetPolActionable(matchingPod) { continue } - newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP, + newPolicy.targetPods[matchingPod.Status.PodIP] = podInfo{ips: matchingPod.Status.PodIPs, name: matchingPod.ObjectMeta.Name, namespace: matchingPod.ObjectMeta.Namespace, labels: matchingPod.ObjectMeta.Labels} @@ -510,7 +523,7 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI for _, specIngressRule := range policy.Spec.Ingress { ingressRule := ingressRule{} ingressRule.srcPods = make([]podInfo, 0) - ingressRule.srcIPBlocks = make([][]string, 0) + ingressRule.srcIPBlocks = make(map[api.IPFamily][][]string) // If this field is empty or missing in the spec, this rule matches all sources if len(specIngressRule.From) == 0 { @@ -524,13 +537,21 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI continue } ingressRule.srcPods = append(ingressRule.srcPods, - podInfo{ip: peerPod.Status.PodIP, + podInfo{ips: peerPod.Status.PodIPs, name: peerPod.ObjectMeta.Name, namespace: peerPod.ObjectMeta.Namespace, labels: peerPod.ObjectMeta.Labels}) } } - ingressRule.srcIPBlocks = append(ingressRule.srcIPBlocks, npc.evalIPBlockPeer(peer)...) + peerIPBlock := npc.evalIPBlockPeer(peer) + ingressRule.srcIPBlocks[api.IPv4Protocol] = append( + ingressRule.srcIPBlocks[api.IPv4Protocol], + peerIPBlock[api.IPv4Protocol]..., + ) + ingressRule.srcIPBlocks[api.IPv6Protocol] = append( + ingressRule.srcIPBlocks[api.IPv6Protocol], + peerIPBlock[api.IPv6Protocol]..., + ) } } @@ -551,7 +572,7 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI for _, specEgressRule := range policy.Spec.Egress { egressRule := egressRule{} egressRule.dstPods = make([]podInfo, 0) - egressRule.dstIPBlocks = make([][]string, 0) + egressRule.dstIPBlocks = make(map[api.IPFamily][][]string) namedPort2EgressEps := make(namedPort2eps) // If this field is empty or missing in the spec, this rule matches all sources @@ -578,7 +599,7 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI continue } egressRule.dstPods = append(egressRule.dstPods, - podInfo{ip: peerPod.Status.PodIP, + podInfo{ips: peerPod.Status.PodIPs, name: peerPod.ObjectMeta.Name, namespace: peerPod.ObjectMeta.Namespace, labels: peerPod.ObjectMeta.Labels}) @@ -586,7 +607,15 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI } } - egressRule.dstIPBlocks = append(egressRule.dstIPBlocks, npc.evalIPBlockPeer(peer)...) + peerIPBlock := npc.evalIPBlockPeer(peer) + egressRule.dstIPBlocks[api.IPv4Protocol] = append( + egressRule.dstIPBlocks[api.IPv4Protocol], + peerIPBlock[api.IPv4Protocol]..., + ) + egressRule.dstIPBlocks[api.IPv6Protocol] = append( + egressRule.dstIPBlocks[api.IPv6Protocol], + peerIPBlock[api.IPv6Protocol]..., + ) } } @@ -691,21 +720,65 @@ func (npc *NetworkPolicyController) ListNamespaceByLabels(namespaceSelector labe return matchedNamespaces, nil } -func (npc *NetworkPolicyController) evalIPBlockPeer(peer networking.NetworkPolicyPeer) [][]string { - ipBlock := make([][]string, 0) +func (npc *NetworkPolicyController) evalIPBlockPeer(peer networking.NetworkPolicyPeer) map[api.IPFamily][][]string { + ipBlock := make(map[api.IPFamily][][]string) if peer.PodSelector == nil && peer.NamespaceSelector == nil && peer.IPBlock != nil { - if cidr := peer.IPBlock.CIDR; strings.HasSuffix(cidr, "/0") { - ipBlock = append(ipBlock, []string{"0.0.0.0/1", utils.OptionTimeout, "0"}, - []string{"128.0.0.0/1", utils.OptionTimeout, "0"}) - } else { - ipBlock = append(ipBlock, []string{cidr, utils.OptionTimeout, "0"}) + cidr := peer.IPBlock.CIDR + + if utilsnet.IsIPv4CIDRString(cidr) { + if strings.HasSuffix(cidr, "/0") { + ipBlock[api.IPv4Protocol] = append( + ipBlock[api.IPv4Protocol], + []string{"0.0.0.0/1", utils.OptionTimeout, "0"}, + []string{"128.0.0.0/1", utils.OptionTimeout, "0"}, + ) + } else { + ipBlock[api.IPv4Protocol] = append( + ipBlock[api.IPv4Protocol], + []string{cidr, utils.OptionTimeout, "0"}, + ) + } + + for _, except := range peer.IPBlock.Except { + if strings.HasSuffix(except, "/0") { + ipBlock[api.IPv4Protocol] = append( + ipBlock[api.IPv4Protocol], + []string{"0.0.0.0/1", utils.OptionTimeout, "0", utils.OptionNoMatch}, + []string{"128.0.0.0/1", utils.OptionTimeout, "0", utils.OptionNoMatch}, + ) + } else { + ipBlock[api.IPv4Protocol] = append( + ipBlock[api.IPv4Protocol], + []string{except, utils.OptionTimeout, "0", utils.OptionNoMatch}, + ) + } + } } - for _, except := range peer.IPBlock.Except { - if strings.HasSuffix(except, "/0") { - ipBlock = append(ipBlock, []string{"0.0.0.0/1", utils.OptionTimeout, "0", utils.OptionNoMatch}, - []string{"128.0.0.0/1", utils.OptionTimeout, "0", utils.OptionNoMatch}) + + if utilsnet.IsIPv6CIDRString(cidr) { + if strings.HasSuffix(cidr, "/0") { + ipBlock[api.IPv6Protocol] = append( + ipBlock[api.IPv6Protocol], + []string{"2000::/3", utils.OptionTimeout, "0"}, + []string{"fd00::/8", utils.OptionTimeout, "0"}, + ) } else { - ipBlock = append(ipBlock, []string{except, utils.OptionTimeout, "0", utils.OptionNoMatch}) + ipBlock[api.IPv6Protocol] = append(ipBlock[api.IPv6Protocol], []string{cidr, utils.OptionTimeout, "0"}) + } + + for _, except := range peer.IPBlock.Except { + if strings.HasSuffix(except, "/0") { + ipBlock[api.IPv6Protocol] = append( + ipBlock[api.IPv6Protocol], + []string{"2000::/3", utils.OptionTimeout, "0", utils.OptionNoMatch}, + []string{"fd00::/8", utils.OptionTimeout, "0", utils.OptionNoMatch}, + ) + } else { + ipBlock[api.IPv6Protocol] = append( + ipBlock[api.IPv6Protocol], + []string{except, utils.OptionTimeout, "0", utils.OptionNoMatch}, + ) + } } } } @@ -716,6 +789,16 @@ func (npc *NetworkPolicyController) grabNamedPortFromPod(pod *api.Pod, namedPort if pod == nil || namedPort2eps == nil { return } + + ips := make(map[api.IPFamily][]string) + for _, ip := range pod.Status.PodIPs { + if utilsnet.IsIPv4String(ip.IP) { + ips[api.IPv4Protocol] = append(ips[api.IPv4Protocol], ip.IP) + } else if utilsnet.IsIPv6String(ip.IP) { + ips[api.IPv6Protocol] = append(ips[api.IPv6Protocol], ip.IP) + } + } + for k := range pod.Spec.Containers { for _, port := range pod.Spec.Containers[k].Ports { name := port.Name @@ -730,68 +813,77 @@ func (npc *NetworkPolicyController) grabNamedPortFromPod(pod *api.Pod, namedPort } if eps, ok := (*namedPort2eps)[name][protocol][containerPort]; !ok { (*namedPort2eps)[name][protocol][containerPort] = &endPoints{ - ips: []string{pod.Status.PodIP}, + ips: ips, protocolAndPort: protocolAndPort{port: containerPort, protocol: protocol}, } } else { - eps.ips = append(eps.ips, pod.Status.PodIP) + eps.ips = ips } } } } -func networkPolicyChainName(namespace, policyName string, version string) string { - hash := sha256.Sum256([]byte(namespace + policyName + version)) +func networkPolicyChainName(namespace, policyName string, version string, ipFamily api.IPFamily) string { + hash := sha256.Sum256([]byte(namespace + policyName + version + string(ipFamily))) encoded := base32.StdEncoding.EncodeToString(hash[:]) return kubeNetworkPolicyChainPrefix + encoded[:16] } -func policySourcePodIPSetName(namespace, policyName string) string { - hash := sha256.Sum256([]byte(namespace + policyName)) +func policySourcePodIPSetName(namespace, policyName string, ipFamily api.IPFamily) string { + hash := sha256.Sum256([]byte(namespace + policyName + string(ipFamily))) encoded := base32.StdEncoding.EncodeToString(hash[:]) return kubeSourceIPSetPrefix + encoded[:16] } -func policyDestinationPodIPSetName(namespace, policyName string) string { - hash := sha256.Sum256([]byte(namespace + policyName)) +func policyDestinationPodIPSetName(namespace, policyName string, ipFamily api.IPFamily) string { + hash := sha256.Sum256([]byte(namespace + policyName + string(ipFamily))) encoded := base32.StdEncoding.EncodeToString(hash[:]) return kubeDestinationIPSetPrefix + encoded[:16] } -func policyIndexedSourcePodIPSetName(namespace, policyName string, ingressRuleNo int) string { - hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + "pod")) +func policyIndexedSourcePodIPSetName(namespace, policyName string, ingressRuleNo int, ipFamily api.IPFamily) string { + hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + + string(ipFamily) + "pod")) encoded := base32.StdEncoding.EncodeToString(hash[:]) return kubeSourceIPSetPrefix + encoded[:16] } -func policyIndexedDestinationPodIPSetName(namespace, policyName string, egressRuleNo int) string { - hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + "pod")) +func policyIndexedDestinationPodIPSetName(namespace, policyName string, egressRuleNo int, + ipFamily api.IPFamily) string { + hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + + string(ipFamily) + "pod")) encoded := base32.StdEncoding.EncodeToString(hash[:]) return kubeDestinationIPSetPrefix + encoded[:16] } -func policyIndexedSourceIPBlockIPSetName(namespace, policyName string, ingressRuleNo int) string { - hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + "ipblock")) +func policyIndexedSourceIPBlockIPSetName(namespace, policyName string, ingressRuleNo int, + ipFamily api.IPFamily) string { + hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + + string(ipFamily) + "ipblock")) encoded := base32.StdEncoding.EncodeToString(hash[:]) return kubeSourceIPSetPrefix + encoded[:16] } -func policyIndexedDestinationIPBlockIPSetName(namespace, policyName string, egressRuleNo int) string { - hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + "ipblock")) +func policyIndexedDestinationIPBlockIPSetName(namespace, policyName string, egressRuleNo int, + ipFamily api.IPFamily) string { + hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + + string(ipFamily) + "ipblock")) encoded := base32.StdEncoding.EncodeToString(hash[:]) return kubeDestinationIPSetPrefix + encoded[:16] } -func policyIndexedIngressNamedPortIPSetName(namespace, policyName string, ingressRuleNo, namedPortNo int) string { +func policyIndexedIngressNamedPortIPSetName(namespace, policyName string, ingressRuleNo, namedPortNo int, + ipFamily api.IPFamily) string { hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + - strconv.Itoa(namedPortNo) + "namedport")) + strconv.Itoa(namedPortNo) + string(ipFamily) + "namedport")) encoded := base32.StdEncoding.EncodeToString(hash[:]) return kubeDestinationIPSetPrefix + encoded[:16] } -func policyIndexedEgressNamedPortIPSetName(namespace, policyName string, egressRuleNo, namedPortNo int) string { +func policyIndexedEgressNamedPortIPSetName(namespace, policyName string, egressRuleNo, namedPortNo int, + ipFamily api.IPFamily) string { hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + - strconv.Itoa(namedPortNo) + "namedport")) + strconv.Itoa(namedPortNo) + string(ipFamily) + "namedport")) encoded := base32.StdEncoding.EncodeToString(hash[:]) return kubeDestinationIPSetPrefix + encoded[:16] } diff --git a/pkg/controllers/netpol/utils.go b/pkg/controllers/netpol/utils.go index bf4785058..178b1347c 100644 --- a/pkg/controllers/netpol/utils.go +++ b/pkg/controllers/netpol/utils.go @@ -8,6 +8,8 @@ import ( "github.com/cloudnativelabs/kube-router/pkg/utils" api "k8s.io/api/core/v1" + "k8s.io/klog/v2" + utilsnet "k8s.io/utils/net" ) const ( @@ -69,40 +71,75 @@ func validateNodePortRange(nodePortOption string) (string, error) { return fmt.Sprintf("%d:%d", port1, port2), nil } -func getIPsFromPods(pods []podInfo) []string { - ips := make([]string, len(pods)) - for idx, pod := range pods { - ips[idx] = pod.ip +func getIPsFromPods(pods []podInfo, family api.IPFamily) []string { + var ips []string + for _, pod := range pods { + switch family { + case api.IPv4Protocol: + ip, err := getPodIPv4Address(pod) + if err != nil { + klog.Warningf("Could not get IPv4 addresses of all pods: %v", err) + continue + } + ips = append(ips, ip) + case api.IPv6Protocol: + ip, err := getPodIPv6Address(pod) + if err != nil { + klog.Warningf("Could not get IPv6 addresses of all pods: %v", err) + continue + } + ips = append(ips, ip) + } } return ips } -func (npc *NetworkPolicyController) createGenericHashIPSet(ipsetName, hashType string, ips []string) { +func (npc *NetworkPolicyController) createGenericHashIPSet(ipsetName, hashType string, ips []string, + ipFamily api.IPFamily) { setEntries := make([][]string, 0) for _, ip := range ips { setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"}) } - npc.ipSetHandler.RefreshSet(ipsetName, setEntries, hashType) + npc.ipSetHandlers[ipFamily].RefreshSet(ipsetName, setEntries, hashType) } // createPolicyIndexedIPSet creates a policy based ipset and indexes it as an active ipset func (npc *NetworkPolicyController) createPolicyIndexedIPSet( - activePolicyIPSets map[string]bool, ipsetName, hashType string, ips []string) { + activePolicyIPSets map[string]bool, ipsetName, hashType string, ips []string, ipFamily api.IPFamily) { activePolicyIPSets[ipsetName] = true - npc.createGenericHashIPSet(ipsetName, hashType, ips) + npc.createGenericHashIPSet(ipsetName, hashType, ips, ipFamily) } // createPodWithPortPolicyRule handles the case where port details are provided by the ingress/egress rule and creates // an iptables rule that matches on both the source/dest IPs and the port func (npc *NetworkPolicyController) createPodWithPortPolicyRule( - ports []protocolAndPort, policy networkPolicyInfo, policyName string, srcSetName string, dstSetName string) error { + ports []protocolAndPort, policy networkPolicyInfo, policyName string, srcSetName string, dstSetName string, + ipFamily api.IPFamily) error { for _, portProtocol := range ports { comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace if err := npc.appendRuleToPolicyChain(policyName, comment, srcSetName, dstSetName, portProtocol.protocol, - portProtocol.port, portProtocol.endport); err != nil { + portProtocol.port, portProtocol.endport, ipFamily); err != nil { return err } } return nil } + +func getPodIPv6Address(pod podInfo) (string, error) { + for _, ip := range pod.ips { + if utilsnet.IsIPv6String(ip.IP) { + return ip.IP, nil + } + } + return "", fmt.Errorf("pod %s has no IPv6Address", pod.name) +} + +func getPodIPv4Address(pod podInfo) (string, error) { + for _, ip := range pod.ips { + if utilsnet.IsIPv4String(ip.IP) { + return ip.IP, nil + } + } + return "", fmt.Errorf("pod %s has no IPv4Address", pod.name) +} diff --git a/pkg/controllers/proxy/network_services_controller.go b/pkg/controllers/proxy/network_services_controller.go index d17f14735..391075f63 100644 --- a/pkg/controllers/proxy/network_services_controller.go +++ b/pkg/controllers/proxy/network_services_controller.go @@ -719,21 +719,21 @@ func (nsc *NetworkServicesController) cleanupIpvsFirewall() { return } - if _, ok := ipSetHandler.Sets[localIPsIPSetName]; ok { + if _, ok := ipSetHandler.Sets()[localIPsIPSetName]; ok { err = ipSetHandler.Destroy(localIPsIPSetName) if err != nil { klog.Errorf("failed to destroy ipset: %s", err.Error()) } } - if _, ok := ipSetHandler.Sets[serviceIPsIPSetName]; ok { + if _, ok := ipSetHandler.Sets()[serviceIPsIPSetName]; ok { err = ipSetHandler.Destroy(serviceIPsIPSetName) if err != nil { klog.Errorf("failed to destroy ipset: %s", err.Error()) } } - if _, ok := ipSetHandler.Sets[ipvsServicesIPSetName]; ok { + if _, ok := ipSetHandler.Sets()[ipvsServicesIPSetName]; ok { err = ipSetHandler.Destroy(ipvsServicesIPSetName) if err != nil { klog.Errorf("failed to destroy ipset: %s", err.Error()) diff --git a/pkg/options/options.go b/pkg/options/options.go index fc09fc89e..7a100651f 100644 --- a/pkg/options/options.go +++ b/pkg/options/options.go @@ -32,6 +32,8 @@ type KubeRouterConfig struct { DisableSrcDstCheck bool EnableCNI bool EnableiBGP bool + EnableIPv4 bool + EnableIPv6 bool EnableOverlay bool EnablePodEgress bool EnablePprof bool @@ -84,6 +86,7 @@ func NewKubeRouterConfig() *KubeRouterConfig { CacheSyncTimeout: 1 * time.Minute, ClusterIPCIDR: "10.96.0.0/12", EnableOverlay: true, + EnableIPv4: true, IPTablesSyncPeriod: 5 * time.Minute, IpvsGracefulPeriod: 30 * time.Second, IpvsSyncPeriod: 5 * time.Minute, @@ -133,6 +136,8 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) { "Enable CNI plugin. Disable if you want to use kube-router features alongside another CNI plugin.") fs.BoolVar(&s.EnableiBGP, "enable-ibgp", true, "Enables peering with nodes with the same ASN, if disabled will only peer with external BGP peers") + fs.BoolVar(&s.EnableIPv4, "enable-ipv4", true, "Enables IPv4 support") + fs.BoolVar(&s.EnableIPv6, "enable-ipv6", true, "Enables IPv6 support") fs.BoolVar(&s.EnableOverlay, "enable-overlay", true, "When enable-overlay is set to true, IP-in-IP tunneling is used for pod-to-pod networking across "+ "nodes in different subnets. When set to false no tunneling is used and routing infrastructure is "+ diff --git a/pkg/utils/ipset.go b/pkg/utils/ipset.go index e0ebb25d3..df21c1699 100644 --- a/pkg/utils/ipset.go +++ b/pkg/utils/ipset.go @@ -150,10 +150,23 @@ const ( tmpIPSetPrefix = "TMP-" ) +type IPSetHandler interface { + Create(setName string, createOptions ...string) (*Set, error) + Add(set *Set) error + RefreshSet(setName string, entriesWithOptions [][]string, setType string) + Destroy(setName string) error + DestroyAllWithin() error + Save() error + Restore() error + Flush() error + Get(setName string) *Set + Sets() map[string]*Set +} + // IPSet represent ipset sets managed by. type IPSet struct { ipSetPath *string - Sets map[string]*Set + sets map[string]*Set isIpv6 bool } @@ -225,7 +238,7 @@ func NewIPSet(isIpv6 bool) (*IPSet, error) { } ipSet := &IPSet{ ipSetPath: ipSetPath, - Sets: make(map[string]*Set), + sets: make(map[string]*Set), isIpv6: isIpv6, } return ipSet, nil @@ -237,7 +250,7 @@ func NewIPSet(isIpv6 bool) (*IPSet, error) { func (ipset *IPSet) Create(setName string, createOptions ...string) (*Set, error) { // Populate Set map if needed if ipset.Get(setName) == nil { - ipset.Sets[setName] = &Set{ + ipset.sets[setName] = &Set{ Name: setName, Options: createOptions, Parent: ipset, @@ -245,7 +258,7 @@ func (ipset *IPSet) Create(setName string, createOptions ...string) (*Set, error } // Determine if set with the same name is already active on the system - setIsActive, err := ipset.Sets[setName].IsActive() + setIsActive, err := ipset.sets[setName].IsActive() if err != nil { return nil, fmt.Errorf("failed to determine if ipset set %s exists: %s", setName, err) @@ -255,7 +268,7 @@ func (ipset *IPSet) Create(setName string, createOptions ...string) (*Set, error if !setIsActive { if ipset.isIpv6 { // Add "family inet6" option and a "inet6:" prefix for IPv6 sets. - args := []string{"create", "-exist", ipset.Sets[setName].name()} + args := []string{"create", "-exist", ipset.sets[setName].name()} args = append(args, createOptions...) args = append(args, "family", "inet6") if _, err := ipset.run(args...); err != nil { @@ -269,7 +282,7 @@ func (ipset *IPSet) Create(setName string, createOptions ...string) (*Set, error } } } - return ipset.Sets[setName], nil + return ipset.sets[setName], nil } // Add a given Set to an IPSet @@ -295,15 +308,19 @@ func (ipset *IPSet) Add(set *Set) error { // RefreshSet add/update internal Sets with a Set of entries but does not run restore command func (ipset *IPSet) RefreshSet(setName string, entriesWithOptions [][]string, setType string) { if ipset.Get(setName) == nil { - ipset.Sets[setName] = &Set{ + options := []string{setType, OptionTimeout, "0"} + if ipset.isIpv6 { + options = append(options, "family", "inet6") + } + ipset.sets[setName] = &Set{ Name: setName, - Options: []string{setType, OptionTimeout, "0"}, + Options: options, Parent: ipset, } } entries := make([]*Entry, len(entriesWithOptions)) for i, entry := range entriesWithOptions { - entries[i] = &Entry{Set: ipset.Sets[setName], Options: entry} + entries[i] = &Entry{Set: ipset.sets[setName], Options: entry} } ipset.Get(setName).Entries = entries } @@ -386,7 +403,7 @@ func (set *Set) Destroy() error { return err } - delete(set.Parent.Sets, set.Name) + delete(set.Parent.sets, set.Name) return nil } @@ -409,7 +426,7 @@ func (ipset *IPSet) Destroy(setName string) error { // DestroyAllWithin destroys all sets contained within the IPSet's Sets. func (ipset *IPSet) DestroyAllWithin() error { - for _, v := range ipset.Sets { + for _, v := range ipset.sets { err := v.Destroy() if err != nil { return err @@ -471,8 +488,8 @@ func parseIPSetSave(ipset *IPSet, result string) map[string]*Set { // create KUBE-DST-3YNVZWWGX3UQQ4VQ hash:ip family inet hashsize 1024 maxelem 65536 timeout 0 // add KUBE-DST-3YNVZWWGX3UQQ4VQ 100.96.1.6 timeout 0 func buildIPSetRestore(ipset *IPSet) string { - setNames := make([]string, 0, len(ipset.Sets)) - for setName := range ipset.Sets { + setNames := make([]string, 0, len(ipset.sets)) + for setName := range ipset.sets { // we need setNames in some consistent order so that we can unit-test this method has a predictable output: setNames = append(setNames, setName) } @@ -482,7 +499,7 @@ func buildIPSetRestore(ipset *IPSet) string { tmpSets := map[string]string{} ipSetRestore := &strings.Builder{} for _, setName := range setNames { - set := ipset.Sets[setName] + set := ipset.sets[setName] setOptions := strings.Join(set.Options, " ") tmpSetName := tmpSets[setOptions] @@ -536,7 +553,7 @@ func (ipset *IPSet) Save() error { if err != nil { return err } - ipset.Sets = parseIPSetSave(ipset, stdout) + ipset.sets = parseIPSetSave(ipset, stdout) return nil } @@ -575,7 +592,7 @@ func (ipset *IPSet) Flush() error { // Get Set by Name. func (ipset *IPSet) Get(setName string) *Set { - set, ok := ipset.Sets[setName] + set, ok := ipset.sets[setName] if !ok { return nil } @@ -583,6 +600,10 @@ func (ipset *IPSet) Get(setName string) *Set { return set } +func (ipset *IPSet) Sets() map[string]*Set { + return ipset.sets +} + // Rename a set. Set identified by SETNAME-TO must not exist. func (set *Set) Rename(newName string) error { if set.Parent.isIpv6 { diff --git a/pkg/utils/ipset_test.go b/pkg/utils/ipset_test.go index 152556823..d7445092b 100644 --- a/pkg/utils/ipset_test.go +++ b/pkg/utils/ipset_test.go @@ -14,7 +14,7 @@ func Test_buildIPSetRestore(t *testing.T) { { name: "simple-restore", args: args{ - ipset: &IPSet{Sets: map[string]*Set{ + ipset: &IPSet{sets: map[string]*Set{ "foo": { Name: "foo", Options: []string{"hash:ip", "yolo", "things", "12345"}, diff --git a/pkg/utils/iptables.go b/pkg/utils/iptables.go index d4cdfe64a..d9a186381 100644 --- a/pkg/utils/iptables.go +++ b/pkg/utils/iptables.go @@ -5,10 +5,42 @@ import ( "fmt" "os/exec" "strings" + + "github.com/coreos/go-iptables/iptables" + v1core "k8s.io/api/core/v1" ) var hasWait bool +// Interface based on the IPTables struct from github.com/coreos/go-iptables +// which allows to mock it. +type IPTablesHandler interface { + Proto() iptables.Protocol + Exists(table, chain string, rulespec ...string) (bool, error) + Insert(table, chain string, pos int, rulespec ...string) error + Append(table, chain string, rulespec ...string) error + AppendUnique(table, chain string, rulespec ...string) error + Delete(table, chain string, rulespec ...string) error + DeleteIfExists(table, chain string, rulespec ...string) error + List(table, chain string) ([]string, error) + ListWithCounters(table, chain string) ([]string, error) + ListChains(table string) ([]string, error) + ChainExists(table, chain string) (bool, error) + Stats(table, chain string) ([][]string, error) + ParseStat(stat []string) (iptables.Stat, error) + StructuredStats(table, chain string) ([]iptables.Stat, error) + NewChain(table, chain string) error + ClearChain(table, chain string) error + RenameChain(table, oldChain, newChain string) error + DeleteChain(table, chain string) error + ClearAndDeleteChain(table, chain string) error + ClearAll() error + DeleteAll() error + ChangePolicy(table, chain, target string) error + HasRandomFully() bool + GetIptablesVersion() (int, int, int) +} + //nolint:gochecknoinits // This is actually a good usage of the init() function func init() { path, err := exec.LookPath("iptables-restore") @@ -74,30 +106,88 @@ func Restore(table string, data []byte) error { // AppendUnique ensures that rule is in chain only once in the buffer and that the occurrence is at the end of the // buffer -func AppendUnique(buffer bytes.Buffer, chain string, rule []string) bytes.Buffer { - var desiredBuffer bytes.Buffer - +func AppendUnique(buffer *bytes.Buffer, chain string, rule []string) { // First we need to remove any previous instances of the rule that exist, so that we can be sure that our version // is unique and appended to the very end of the buffer rules := strings.Split(buffer.String(), "\n") if len(rules) > 0 && rules[len(rules)-1] == "" { rules = rules[:len(rules)-1] } + buffer.Reset() + for _, foundRule := range rules { if strings.Contains(foundRule, chain) && strings.Contains(foundRule, strings.Join(rule, " ")) { continue } - desiredBuffer.WriteString(foundRule + "\n") + buffer.WriteString(foundRule + "\n") } // Now append the rule that we wanted to be unique - desiredBuffer = Append(desiredBuffer, chain, rule) - return desiredBuffer + Append(buffer, chain, rule) } // Append appends rule to chain at the end of buffer -func Append(buffer bytes.Buffer, chain string, rule []string) bytes.Buffer { - ruleStr := strings.Join(append([]string{"-A", chain}, rule...), " ") - buffer.WriteString(ruleStr + "\n") - return buffer +func Append(buffer *bytes.Buffer, chain string, rule []string) { + ruleStr := strings.Join(append(append([]string{"-A", chain}, rule...), "\n"), " ") + buffer.WriteString(ruleStr) +} + +type IPTablesSaveRestore struct { + saveCmd string + restoreCmd string +} + +func NewIPTablesSaveRestore(ipFamily v1core.IPFamily) *IPTablesSaveRestore { + switch ipFamily { + case v1core.IPv6Protocol: + return &IPTablesSaveRestore{ + saveCmd: "ip6tables-save", + restoreCmd: "ip6tables-restore", + } + case v1core.IPv4Protocol: + fallthrough + default: + return &IPTablesSaveRestore{ + saveCmd: "iptables-save", + restoreCmd: "iptables-restore", + } + } +} + +func (i *IPTablesSaveRestore) exec(cmdName string, args []string, data []byte, stdoutBuffer *bytes.Buffer) error { + path, err := exec.LookPath(cmdName) + if err != nil { + return err + } + stderrBuffer := bytes.NewBuffer(nil) + cmd := exec.Cmd{ + Path: path, + Args: append([]string{cmdName}, args...), + Stderr: stderrBuffer, + } + if data != nil { + cmd.Stdin = bytes.NewBuffer(data) + } + if stdoutBuffer != nil { + cmd.Stdout = stdoutBuffer + } + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to call %s: %v (%s)", cmdName, err, stderrBuffer) + } + + return nil +} + +func (i *IPTablesSaveRestore) SaveInto(table string, buffer *bytes.Buffer) error { + return i.exec(i.saveCmd, []string{"-t", table}, nil, buffer) +} + +func (i *IPTablesSaveRestore) Restore(table string, data []byte) error { + var args []string + if hasWait { + args = []string{"--wait", "-T", table} + } else { + args = []string{"-T", table} + } + return i.exec(i.restoreCmd, args, data, nil) } diff --git a/pkg/utils/node.go b/pkg/utils/node.go index 8e2e3f2ec..97d0c5754 100644 --- a/pkg/utils/node.go +++ b/pkg/utils/node.go @@ -12,6 +12,7 @@ import ( apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + netutils "k8s.io/utils/net" ) // GetNodeObject returns the node API object for the node @@ -62,6 +63,63 @@ func GetNodeIP(node *apiv1.Node) (net.IP, error) { return nil, errors.New("host IP unknown") } +// addressMap is a mapping of address types to a list of addresses of that type. +// It preallocates the slices of addresses. +type addressMap map[apiv1.NodeAddressType][]apiv1.NodeAddress + +// add adds an address of the given type to the address map. If the given type +// was not already in the map, it creates a new preallocated entry for it. +func (m addressMap) add(address apiv1.NodeAddress) { + if _, ok := m[address.Type]; ok { + m[address.Type] = append(m[address.Type], address) + } else { + // There can be at most 2 addresses of the same type. + m[address.Type] = make([]apiv1.NodeAddress, 2) + m[address.Type] = append(m[address.Type], address) + } +} + +// GetNodeIP returns the most valid external facing IP address for a node (IPv4 and IPv6). +// Order of preference: +// 1. NodeInternalIP +// 2. NodeExternalIP (Only set on cloud providers usually) +func GetNodeIPDualStack(node *apiv1.Node, enableIPv4, enableIPv6 bool) (net.IP, net.IP, error) { + var ipAddrv4, ipAddrv6 net.IP + addresses := node.Status.Addresses + addressesPerType := make(addressMap) + for _, address := range addresses { + addressesPerType.add(address) + } + if addresses, ok := addressesPerType[apiv1.NodeInternalIP]; ok { + for _, address := range addresses { + if ipAddrv4 == nil && enableIPv4 && netutils.IsIPv4String(address.Address) { + ipAddrv4 = net.ParseIP(address.Address) + } + if ipAddrv6 == nil && enableIPv6 && netutils.IsIPv6String(address.Address) { + ipAddrv6 = net.ParseIP(address.Address) + } + } + } + if addresses, ok := addressesPerType[apiv1.NodeExternalIP]; ok { + for _, address := range addresses { + if ipAddrv4 == nil && enableIPv4 && netutils.IsIPv4String(address.Address) { + ipAddrv4 = net.ParseIP(address.Address) + } + if ipAddrv6 == nil && enableIPv6 && netutils.IsIPv6String(address.Address) { + ipAddrv6 = net.ParseIP(address.Address) + } + } + } + + if enableIPv4 && ipAddrv4 == nil { + return nil, nil, errors.New("host IPv4 unknown") + } + if enableIPv6 && ipAddrv6 == nil { + return nil, nil, errors.New("host IPv6 unknown") + } + return ipAddrv4, ipAddrv6, nil +} + // GetMTUFromNodeIP returns the MTU by detecting it from the IP on the node and figuring in tunneling configurations func GetMTUFromNodeIP(nodeIP net.IP, overlayEnabled bool) (int, error) { links, err := netlink.LinkList() diff --git a/pkg/utils/pod_cidr.go b/pkg/utils/pod_cidr.go index f5118a9cc..da83b8d6b 100644 --- a/pkg/utils/pod_cidr.go +++ b/pkg/utils/pod_cidr.go @@ -9,7 +9,9 @@ import ( "github.com/containernetworking/cni/libcni" "github.com/containernetworking/plugins/plugins/ipam/host-local/backend/allocator" + v1core "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" + netutils "k8s.io/utils/net" ) const ( @@ -142,3 +144,34 @@ func GetPodCidrFromNodeSpec(clientset kubernetes.Interface, hostnameOverride str return node.Spec.PodCIDR, nil } + +func GetPodCidrsFromNodeSpecDualStack(node *v1core.Node) (string, string, error) { + var podCidrv4, podCidrv6 string + + if cidrs, ok := node.Annotations[podCIDRAnnotation]; ok { + for _, cidr := range strings.Split(cidrs, ",") { + if podCidrv4 == "" && netutils.IsIPv4CIDRString(cidr) { + podCidrv4 = cidr + } + if podCidrv6 == "" && netutils.IsIPv6CIDRString(cidr) { + podCidrv6 = cidr + } + } + return podCidrv4, podCidrv6, nil + } + + if len(node.Spec.PodCIDRs) == 0 { + return "", "", fmt.Errorf("node.Spec.PodCIDRs empty for node: %v", node.Name) + } + + for _, cidr := range node.Spec.PodCIDRs { + if podCidrv4 == "" && netutils.IsIPv4CIDRString(cidr) { + podCidrv4 = cidr + } + if podCidrv6 == "" && netutils.IsIPv6CIDRString(cidr) { + podCidrv6 = cidr + } + } + + return podCidrv4, podCidrv6, nil +}