From cda84bd398077217237295bbce3c81d72d63ae6d Mon Sep 17 00:00:00 2001 From: Hongliang Liu Date: Tue, 28 May 2024 22:48:33 +0800 Subject: [PATCH] Ensure full functionality of AntreaProxy with proxyAll enabled when kube-proxy presents Signed-off-by: Hongliang Liu --- .github/workflows/kind.yml | 1 + ci/kind/test-e2e-kind.sh | 10 +- cmd/antrea-agent/agent.go | 2 + docs/antrea-proxy.md | 18 +- .../networkpolicy/node_reconciler_linux.go | 15 +- pkg/agent/proxy/proxier.go | 65 +- pkg/agent/proxy/proxier_test.go | 151 +++-- pkg/agent/route/interfaces.go | 20 +- pkg/agent/route/route_linux.go | 556 ++++++++++++++---- pkg/agent/route/route_linux_test.go | 343 +++++++++-- pkg/agent/route/route_windows.go | 21 +- pkg/agent/route/route_windows_test.go | 8 +- pkg/agent/route/testing/mock_route.go | 82 ++- pkg/agent/util/iptables/builder.go | 55 +- pkg/agent/util/iptables/builder_test.go | 28 +- pkg/agent/util/iptables/iptables.go | 28 +- .../iptables/testing/mock_iptables_linux.go | 12 +- pkg/util/env/env.go | 25 + pkg/util/k8s/client.go | 8 +- pkg/util/k8s/client_test.go | 8 +- test/e2e/proxy_test.go | 39 +- test/integration/agent/route_test.go | 18 +- 22 files changed, 1200 insertions(+), 313 deletions(-) diff --git a/.github/workflows/kind.yml b/.github/workflows/kind.yml index eeaad6eaa74..fdfcd235e65 100644 --- a/.github/workflows/kind.yml +++ b/.github/workflows/kind.yml @@ -238,6 +238,7 @@ jobs: --coverage \ --feature-gates AllAlpha=true,AllBeta=true \ --proxy-all \ + --no-kube-proxy \ --node-ipam \ --extra-vlan \ --multicast \ diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 0585116186f..f9268c45eb4 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -28,6 +28,7 @@ _usage="Usage: $0 [--encap-mode ] [--ip-family ] [--coverage] --feature-gates A comma-separated list of key=value pairs that describe feature gates, e.g. AntreaProxy=true,Egress=false. --run Run only tests matching the regexp. --proxy-all Enables Antrea proxy with all Service support. + --no-kube-proxy Don't deploy kube-proxy. --load-balancer-mode LoadBalancer mode. --node-ipam Enables Antrea NodeIPAM. --multicast Enables Multicast. @@ -73,6 +74,7 @@ mode="" ipfamily="v4" feature_gates="" proxy_all=false +no_kube_proxy=false load_balancer_mode="" node_ipam=false multicast=false @@ -108,6 +110,10 @@ case $key in proxy_all=true shift ;; + --no-kube-proxy) + no_kube_proxy=true + shift + ;; --load-balancer-mode) load_balancer_mode="$2" shift 2 @@ -305,7 +311,7 @@ function setup_cluster { echoerr "invalid value for --ip-family \"$ipfamily\", expected \"v4\" or \"v6\"" exit 1 fi - if $proxy_all; then + if $no_kube_proxy; then args="$args --no-kube-proxy" fi if $node_ipam; then @@ -359,7 +365,7 @@ function run_test { cat $CH_OPERATOR_YML | docker exec -i kind-control-plane dd of=/root/clickhouse-operator-install-bundle.yml fi - if $proxy_all; then + if $no_kube_proxy; then apiserver=$(docker exec -i kind-control-plane kubectl get endpoints kubernetes --no-headers | awk '{print $2}') if $coverage; then docker exec -i kind-control-plane sed -i.bak -E "s/^[[:space:]]*[#]?kubeAPIServerOverride[[:space:]]*:[[:space:]]*[a-z\"]+[[:space:]]*$/ kubeAPIServerOverride: \"$apiserver\"/" /root/antrea-coverage.yml /root/antrea-ipsec-coverage.yml diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 52c3a44cc7e..3473a687eb6 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -235,6 +235,7 @@ func run(o *Options) error { routeClient, err := route.NewClient(networkConfig, o.config.NoSNAT, o.config.AntreaProxy.ProxyAll, + o.config.KubeAPIServerOverride != "", connectUplinkToBridge, nodeNetworkPolicyEnabled, multicastEnabled, @@ -464,6 +465,7 @@ func run(o *Options) error { o.defaultLoadBalancerMode, v4GroupCounter, v6GroupCounter, + o.config.KubeAPIServerOverride != "", enableMulticlusterGW) if err != nil { return fmt.Errorf("error when creating proxier: %v", err) diff --git a/docs/antrea-proxy.md b/docs/antrea-proxy.md index 1469a76fb49..50c016f51ef 100644 --- a/docs/antrea-proxy.md +++ b/docs/antrea-proxy.md @@ -42,14 +42,20 @@ the introduction of `proxyAll`, Antrea relied on userspace kube-proxy, which is no longer actively maintained by the K8s community and is slower than other kube-proxy backends. -Note that on Linux, even when `proxyAll` is enabled, kube-proxy will usually -take priority and will keep handling NodePort Service traffic (unless the source -is a Pod, which is pretty unusual as Pods typically access Services by -ClusterIP). This is because kube-proxy rules typically come before the rules -installed by AntreaProxy to redirect traffic to OVS. When kube-proxy is not -deployed or is removed from the cluster, AntreaProxy will then handle all +Note that on Linux, before Antrea v2.1, when `proxyAll` is enabled, kube-proxy +will usually take priority and will keep handling NodePort Service traffic +(unless the source is a Pod, which is pretty unusual as Pods typically access +Services by ClusterIP). This is because kube-proxy rules typically come before +the rules installed by AntreaProxy to redirect traffic to OVS. When kube-proxy +is not deployed or is removed from the cluster, AntreaProxy will then handle all Service traffic. +It's worth noting that starting with Antrea v2.1, when only `proxyAll` is enabled, +even if kube-proxy is present, AntreaProxy is capable of handing all types of +Service traffic except for that of the kube Service,. This is accomplished by +prioritizing the rules installed by AntreaProxy redirecting Service traffic to OVS +over those installed by kube-proxy. + ### Removing kube-proxy In this section, we will provide steps to run a K8s cluster without kube-proxy, diff --git a/pkg/agent/controller/networkpolicy/node_reconciler_linux.go b/pkg/agent/controller/networkpolicy/node_reconciler_linux.go index dba3bb55bff..f3c2d3982ee 100644 --- a/pkg/agent/controller/networkpolicy/node_reconciler_linux.go +++ b/pkg/agent/controller/networkpolicy/node_reconciler_linux.go @@ -31,6 +31,7 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/route" "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/agent/util/ipset" "antrea.io/antrea/pkg/agent/util/iptables" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" secv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1" @@ -42,6 +43,8 @@ const ( ipv6Any = "::/0" ) +var ipsetTypeHashIP = ipset.HashIP + /* Tips: In the following, service describes a port to allow traffic on which is defined in pkg/apis/controlplane/v1beta2/types.go @@ -622,7 +625,7 @@ func buildCoreIPTRule(ipProtocol iptables.Protocol, builder := iptables.NewRuleBuilder(iptChain) if isIngress { if ipset != "" { - builder = builder.MatchIPSetSrc(ipset) + builder = builder.MatchIPSetSrc(ipset, ipsetTypeHashIP) } else if ipnet != "" { builder = builder.MatchCIDRSrc(ipnet) } else { @@ -631,7 +634,7 @@ func buildCoreIPTRule(ipProtocol iptables.Protocol, } } else { if ipset != "" { - builder = builder.MatchIPSetDst(ipset) + builder = builder.MatchIPSetDst(ipset, ipsetTypeHashIP) } else if ipnet != "" { builder = builder.MatchCIDRDst(ipnet) } else { @@ -648,8 +651,8 @@ func buildCoreIPTRule(ipProtocol iptables.Protocol, fallthrough case "sctp": builder = builder.MatchTransProtocol(transProtocol). - MatchSrcPort(service.SrcPort, service.SrcEndPort). - MatchDstPort(service.Port, service.EndPort) + MatchPortSrc(service.SrcPort, service.SrcEndPort). + MatchPortDst(service.Port, service.EndPort) case "icmp": builder = builder.MatchICMP(service.ICMPType, service.ICMPCode, ipProtocol) } @@ -673,8 +676,8 @@ func buildServiceIPTRules(ipProtocol iptables.Protocol, services []v1beta2.Servi fallthrough case "sctp": copiedBuilder = copiedBuilder.MatchTransProtocol(transProtocol). - MatchSrcPort(svc.SrcPort, svc.SrcEndPort). - MatchDstPort(svc.Port, svc.EndPort) + MatchPortSrc(svc.SrcPort, svc.SrcEndPort). + MatchPortDst(svc.Port, svc.EndPort) case "icmp": copiedBuilder = copiedBuilder.MatchICMP(svc.ICMPType, svc.ICMPCode, ipProtocol) } diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 143191b19f1..b69550cbfbd 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -570,7 +570,7 @@ func (p *proxier) installNodePortService(localGroupID, clusterGroupID binding.Gr }); err != nil { return fmt.Errorf("failed to install NodePort load balancing flows: %w", err) } - if err := p.routeClient.AddNodePort(p.nodePortAddresses, svcPort, protocol); err != nil { + if err := p.routeClient.AddNodePortConf(p.nodePortAddresses, svcPort, protocol); err != nil { return fmt.Errorf("failed to install NodePort traffic redirecting rules: %w", err) } return nil @@ -587,7 +587,7 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot if err := p.ofClient.UninstallServiceFlows(svcIP, svcPort, protocol); err != nil { return fmt.Errorf("failed to remove NodePort load balancing flows: %w", err) } - if err := p.routeClient.DeleteNodePort(p.nodePortAddresses, svcPort, protocol); err != nil { + if err := p.routeClient.DeleteNodePortConf(p.nodePortAddresses, svcPort, protocol); err != nil { return fmt.Errorf("failed to remove NodePort traffic redirecting rules: %w", err) } return nil @@ -602,6 +602,8 @@ func (p *proxier) installExternalIPService(svcInfoStr string, trafficPolicyLocal bool, affinityTimeout uint16, loadBalancerMode agentconfig.LoadBalancerMode) error { + externalIPs := make([]net.IP, 0, len(externalIPStrings)) + isDSR := features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR for _, externalIP := range externalIPStrings { ip := net.ParseIP(externalIP) if err := p.ofClient.InstallServiceFlows(&agenttypes.ServiceConfig{ @@ -615,18 +617,28 @@ func (p *proxier) installExternalIPService(svcInfoStr string, IsExternal: true, IsNodePort: false, IsNested: false, // Unsupported for ExternalIP - IsDSR: features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR, + IsDSR: isDSR, }); err != nil { return fmt.Errorf("failed to install ExternalIP load balancing flows: %w", err) } if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddExternalIPRoute); err != nil { return fmt.Errorf("failed to install ExternalIP traffic redirecting routes: %w", err) } + externalIPs = append(externalIPs, ip) + } + if len(externalIPs) != 0 { + if err := p.routeClient.AddExternalIPConf(externalIPs, svcPort, protocol); err != nil { + return err + } } return nil } -func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPStrings []string, svcPort uint16, protocol binding.Protocol) error { +func (p *proxier) uninstallExternalIPService(svcInfoStr string, + externalIPStrings []string, + svcPort uint16, + protocol binding.Protocol) error { + externalIPs := make([]net.IP, 0, len(externalIPStrings)) for _, externalIP := range externalIPStrings { ip := net.ParseIP(externalIP) if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil { @@ -635,6 +647,12 @@ func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPString if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteExternalIPRoute); err != nil { return fmt.Errorf("failed to remove ExternalIP traffic redirecting routes: %w", err) } + externalIPs = append(externalIPs, ip) + } + if len(externalIPs) != 0 { + if err := p.routeClient.DeleteExternalIPConf(externalIPs, svcPort, protocol); err != nil { + return err + } } return nil } @@ -648,6 +666,8 @@ func (p *proxier) installLoadBalancerService(svcInfoStr string, trafficPolicyLocal bool, affinityTimeout uint16, loadBalancerMode agentconfig.LoadBalancerMode) error { + loadBalancerIPs := make([]net.IP, 0, len(loadBalancerIPStrings)) + isDSR := features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR for _, ingress := range loadBalancerIPStrings { if ingress != "" { ip := net.ParseIP(ingress) @@ -662,7 +682,7 @@ func (p *proxier) installLoadBalancerService(svcInfoStr string, IsExternal: true, IsNodePort: false, IsNested: false, // Unsupported for LoadBalancerIP - IsDSR: features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR, + IsDSR: isDSR, }); err != nil { return fmt.Errorf("failed to install LoadBalancer load balancing flows: %w", err) } @@ -670,9 +690,16 @@ func (p *proxier) installLoadBalancerService(svcInfoStr string, if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddExternalIPRoute); err != nil { return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err) } + loadBalancerIPs = append(loadBalancerIPs, ip) } } } + if p.proxyAll && len(loadBalancerIPs) != 0 { + if err := p.routeClient.AddLoadBalancerConf(loadBalancerIPs, svcPort, protocol); err != nil { + return err + } + } + return nil } @@ -693,7 +720,11 @@ func (p *proxier) addRouteForServiceIP(svcInfoStr string, ip net.IP, addRouteFn return nil } -func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error { +func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, + loadBalancerIPStrings []string, + svcPort uint16, + protocol binding.Protocol) error { + loadBalancerIPs := make([]net.IP, 0, len(loadBalancerIPStrings)) for _, ingress := range loadBalancerIPStrings { if ingress != "" { ip := net.ParseIP(ingress) @@ -704,9 +735,15 @@ func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIP if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteExternalIPRoute); err != nil { return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err) } + loadBalancerIPs = append(loadBalancerIPs, ip) } } } + if p.proxyAll && len(loadBalancerIPs) != 0 { + if err := p.routeClient.DeleteLoadBalancerConf(loadBalancerIPs, svcPort, protocol); err != nil { + return err + } + } return nil } @@ -1391,6 +1428,7 @@ func newProxier( nodeIPChecker nodeip.Checker, nodePortAddresses []net.IP, proxyAllEnabled bool, + isKubeAPIServerOverridden bool, skipServices []string, proxyLoadBalancerIPs bool, defaultLoadBalancerMode agentconfig.LoadBalancerMode, @@ -1421,7 +1459,13 @@ func newProxier( } var serviceHealthServer healthcheck.ServiceHealthServer - if proxyAllEnabled { + if proxyAllEnabled && isKubeAPIServerOverridden { + // The serviceHealthServer of AntreaProxy should be initialized only when kube-proxy is removed. If kube-proxy + // is present, it also provides the serviceHealthServer functionality, and both servers would attempt to start + // an HTTP service on the same port in a K8s Node, causing conflicts. The option `kubeAPIServerOverride` + // in antrea-agent is used to determine if the serviceHealthServer of AntreaProxy should be initialized. The + // option must be set if kube-proxy is removed, though it can also be set when kube-proxy is present (not + // recommended and unnecessary). We assume this option is set only when kube-proxy is removed. nodePortAddressesString := make([]string, len(nodePortAddresses)) for i, address := range nodePortAddresses { nodePortAddressesString[i] = address.String() @@ -1532,6 +1576,7 @@ func newDualStackProxier( nodePortAddressesIPv4 []net.IP, nodePortAddressesIPv6 []net.IP, proxyAllEnabled bool, + isKubeAPIServerOverridden bool, skipServices []string, proxyLoadBalancerIPs bool, defaultLoadBalancerMode agentconfig.LoadBalancerMode, @@ -1553,6 +1598,7 @@ func newDualStackProxier( nodeIPChecker, nodePortAddressesIPv4, proxyAllEnabled, + isKubeAPIServerOverridden, skipServices, proxyLoadBalancerIPs, defaultLoadBalancerMode, @@ -1575,6 +1621,7 @@ func newDualStackProxier( nodeIPChecker, nodePortAddressesIPv6, proxyAllEnabled, + isKubeAPIServerOverridden, skipServices, proxyLoadBalancerIPs, defaultLoadBalancerMode, @@ -1607,6 +1654,7 @@ func NewProxier(hostname string, defaultLoadBalancerMode agentconfig.LoadBalancerMode, v4GroupCounter types.GroupCounter, v6GroupCounter types.GroupCounter, + isKubeAPIServerOverridden bool, nestedServiceSupport bool) (Proxier, error) { proxyAllEnabled := proxyConfig.ProxyAll skipServices := proxyConfig.SkipServices @@ -1630,6 +1678,7 @@ func NewProxier(hostname string, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyAllEnabled, + isKubeAPIServerOverridden, skipServices, proxyLoadBalancerIPs, defaultLoadBalancerMode, @@ -1653,6 +1702,7 @@ func NewProxier(hostname string, nodeIPChecker, nodePortAddressesIPv4, proxyAllEnabled, + isKubeAPIServerOverridden, skipServices, proxyLoadBalancerIPs, defaultLoadBalancerMode, @@ -1675,6 +1725,7 @@ func NewProxier(hostname string, nodeIPChecker, nodePortAddressesIPv6, proxyAllEnabled, + isKubeAPIServerOverridden, skipServices, proxyLoadBalancerIPs, defaultLoadBalancerMode, diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 8778b94842e..bea99300046 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -415,6 +415,7 @@ func newFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodeP fakeNodeIPChecker, nodePortAddresses, o.proxyAllEnabled, + true, []string{skippedServiceNN, skippedClusterIP}, o.proxyLoadBalancerIPs, o.defaultLoadBalancerMode, @@ -541,7 +542,8 @@ func testClusterIPAdd(t *testing.T, } } if externalIP != nil { - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().AddExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol).Times(1) } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -746,10 +748,13 @@ func testLoadBalancerAdd(t *testing.T, } if proxyLoadBalancerIPs { mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) } - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) if externalIP != nil { - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().AddExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol).Times(1) + } fp.syncProxyRules() @@ -906,9 +911,10 @@ func testNodePortAdd(t *testing.T, }).Times(1) } } - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) if externalIP != nil { - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().AddExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol).Times(1) } fp.syncProxyRules() @@ -1176,7 +1182,8 @@ func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { ClusterGroupID: clusterGroupID1, IsExternal: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIPv4}, uint16(port80Int32), binding.ProtocolTCP).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIPv4).Times(1) localGroupID2 := fp.groupCounter.AllocateIfNotExist(svcPortName2, true) @@ -1210,7 +1217,8 @@ func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { ClusterGroupID: clusterGroupID2, IsExternal: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIPv4}, uint16(port443Int32), binding.ProtocolTCP).Times(1) fp.syncProxyRules() @@ -1223,14 +1231,16 @@ func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv4, uint16(port80Int32), binding.ProtocolTCP) mockOFClient.EXPECT().UninstallServiceFlows(agentconfig.VirtualNodePortDNATIPv4, uint16(port30001Int32), binding.ProtocolTCP) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIPv4, uint16(port80Int32), binding.ProtocolTCP) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteLoadBalancerConf([]net.IP{loadBalancerIPv4}, uint16(port80Int32), binding.ProtocolTCP) mockOFClient.EXPECT().UninstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort443, remoteEndpointForPort443})) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv4, uint16(port443Int32), binding.ProtocolTCP) mockOFClient.EXPECT().UninstallServiceFlows(agentconfig.VirtualNodePortDNATIPv4, uint16(port30002Int32), binding.ProtocolTCP) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIPv4, uint16(port443Int32), binding.ProtocolTCP) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteLoadBalancerConf([]net.IP{loadBalancerIPv4}, uint16(port443Int32), binding.ProtocolTCP) // The route for the ClusterIP and the LoadBalancer IP should only be uninstalled once. mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIPv4) @@ -1531,8 +1541,10 @@ func testClusterIPRemove(t *testing.T, bindingProtocol binding.Protocol, isIPv6 mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) } if externalIP != nil { - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) - mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().AddExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol).Times(1) } if needClearConntrackEntries(bindingProtocol) { mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol) @@ -1629,7 +1641,7 @@ func testNodePortRemove(t *testing.T, bindingProtocol binding.Protocol, isIPv6 b IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) if externalIP != nil { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: externalIP, @@ -1640,17 +1652,19 @@ func testNodePortRemove(t *testing.T, bindingProtocol binding.Protocol, isIPv6 b ClusterGroupID: 2, IsExternal: true, }).Times(1) - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().AddExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol).Times(1) } mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) if externalIP != nil { mockOFClient.EXPECT().UninstallServiceFlows(externalIP, uint16(svcPort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP) + mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().DeleteExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol).Times(1) } if needClearConntrackEntries(bindingProtocol) { mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol) @@ -1765,8 +1779,9 @@ func testLoadBalancerRemove(t *testing.T, bindingProtocol binding.Protocol, isIP ClusterGroupID: 2, IsExternal: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) if externalIP != nil { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: externalIP, @@ -1777,7 +1792,8 @@ func testLoadBalancerRemove(t *testing.T, bindingProtocol binding.Protocol, isIP ClusterGroupID: 2, IsExternal: true, }).Times(1) - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().AddExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol).Times(1) } mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) @@ -1785,11 +1801,13 @@ func testLoadBalancerRemove(t *testing.T, bindingProtocol binding.Protocol, isIP mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) if externalIP != nil { mockOFClient.EXPECT().UninstallServiceFlows(externalIP, uint16(svcPort), bindingProtocol).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP) + mockRouteClient.EXPECT().DeleteExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol).Times(1) } if needClearConntrackEntries(bindingProtocol) { mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol) @@ -2062,12 +2080,12 @@ func testNodePortNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net. IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) fp.syncProxyRules() mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), gomock.Any()).Times(1) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, ServicePort: uint16(svcPort + 1), @@ -2085,7 +2103,7 @@ func testNodePortNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net. IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) fp.syncProxyRules() } @@ -2168,15 +2186,17 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP ClusterGroupID: 2, IsExternal: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), gomock.Any()).Times(1) fp.syncProxyRules() mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), gomock.Any()).Times(1) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, ServicePort: uint16(svcPort + 1), @@ -2203,8 +2223,9 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP ClusterGroupID: 2, IsExternal: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort+1), gomock.Any()).Times(1) fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) fp.syncProxyRules() } @@ -2352,7 +2373,9 @@ func testLoadBalancerRemoveEndpoints(t *testing.T, nodePortAddresses []net.IP, s IsExternal: true, ClusterGroupID: 1, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) @@ -2580,7 +2603,7 @@ func testServicePortUpdate(t *testing.T, IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2591,8 +2614,8 @@ func testServicePortUpdate(t *testing.T, IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2603,6 +2626,7 @@ func testServicePortUpdate(t *testing.T, IsExternal: true, }).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) s1 = mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol) s2 = mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2615,7 +2639,9 @@ func testServicePortUpdate(t *testing.T, s2.After(s1) mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort+1), bindingProtocol).Times(1) } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -2706,10 +2732,10 @@ func testServiceNodePortUpdate(t *testing.T, IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) s1 := mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) s2 := mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: vIP, ServicePort: uint16(svcNodePort + 1), @@ -2718,7 +2744,7 @@ func testServiceNodePortUpdate(t *testing.T, IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort+1), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort+1), bindingProtocol).Times(1) s2.After(s1) } if svcType == corev1.ServiceTypeLoadBalancer { @@ -2730,6 +2756,7 @@ func testServiceNodePortUpdate(t *testing.T, IsExternal: true, }).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) } fp.syncProxyRules() @@ -2826,6 +2853,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, IsExternal: true, }).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().AddExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2836,7 +2864,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2847,6 +2875,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, IsExternal: true, }).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -2876,6 +2905,8 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, }).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().DeleteExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddExternalIPConf([]net.IP{externalIP}, uint16(svcPort), bindingProtocol).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { s1 := mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) @@ -2891,8 +2922,8 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, }).Times(1) s2.After(s1) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { s1 := mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) @@ -2909,6 +2940,8 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -3095,27 +3128,35 @@ func testServiceIngressIPsUpdate(t *testing.T, IsExternal: true, }).Times(1) } - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) for _, ip := range loadBalancerIPs { mockRouteClient.EXPECT().AddExternalIPRoute(ip).Times(1) } - - toDeleteLoadBalancerIPs := smallSliceDifference(loadBalancerIPStrs, updatedLoadBalancerIPStrs) - toAddLoadBalancerIPs := smallSliceDifference(updatedLoadBalancerIPStrs, loadBalancerIPStrs) - for _, ipStr := range toDeleteLoadBalancerIPs { - mockOFClient.EXPECT().UninstallServiceFlows(net.ParseIP(ipStr), uint16(svcPort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().DeleteExternalIPRoute(net.ParseIP(ipStr)).Times(1) - } - for _, ipStr := range toAddLoadBalancerIPs { + mockRouteClient.EXPECT().AddLoadBalancerConf(loadBalancerIPs, uint16(svcPort), bindingProtocol).Times(1) + + toDeleteLoadBalancerIPStrings := smallSliceDifference(loadBalancerIPStrs, updatedLoadBalancerIPStrs) + toAddLoadBalancerIPStrings := smallSliceDifference(updatedLoadBalancerIPStrs, loadBalancerIPStrs) + var toDeleteLoadBalancerIPs, toAddLoadBalancerIPs []net.IP + for _, ipStr := range toDeleteLoadBalancerIPStrings { + ip := net.ParseIP(ipStr) + toDeleteLoadBalancerIPs = append(toDeleteLoadBalancerIPs, ip) + mockOFClient.EXPECT().UninstallServiceFlows(ip, uint16(svcPort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteExternalIPRoute(ip).Times(1) + } + mockRouteClient.EXPECT().DeleteLoadBalancerConf(toDeleteLoadBalancerIPs, uint16(svcPort), bindingProtocol).Times(1) + for _, ipStr := range toAddLoadBalancerIPStrings { + ip := net.ParseIP(ipStr) + toAddLoadBalancerIPs = append(toAddLoadBalancerIPs, ip) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: net.ParseIP(ipStr), + ServiceIP: ip, ServicePort: uint16(svcPort), Protocol: bindingProtocol, ClusterGroupID: 1, IsExternal: true, }).Times(1) - mockRouteClient.EXPECT().AddExternalIPRoute(net.ParseIP(ipStr)).Times(1) + mockRouteClient.EXPECT().AddExternalIPRoute(ip).Times(1) } + mockRouteClient.EXPECT().AddLoadBalancerConf(toAddLoadBalancerIPs, uint16(svcPort), bindingProtocol).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -3212,9 +3253,9 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, IsNodePort: true, AffinityTimeout: uint16(affinitySeconds), }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: vIP, ServicePort: uint16(svcNodePort), @@ -3224,7 +3265,7 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, IsNodePort: true, AffinityTimeout: uint16(updatedAffinitySeconds), }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3236,8 +3277,10 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, IsExternal: true, }).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: loadBalancerIP, ServicePort: uint16(svcPort), @@ -3247,6 +3290,7 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, IsExternal: true, }).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) } fp.syncProxyRules() @@ -3351,7 +3395,7 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, IsExternal: true, IsNodePort: true, }).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3363,8 +3407,8 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, IsNodePort: true, AffinityTimeout: uint16(affinitySeconds), }).Times(1) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3375,6 +3419,7 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, IsExternal: true, }).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3387,6 +3432,8 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, }).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddLoadBalancerConf([]net.IP{loadBalancerIP}, uint16(svcPort), bindingProtocol).Times(1) } fp.syncProxyRules() @@ -3607,7 +3654,7 @@ func TestGetServiceFlowKeys(t *testing.T) { makeEndpointSliceMap(fp, eps) } if tc.svc != nil && tc.eps != nil && tc.serviceInstalled { - mockRouteClient.EXPECT().AddNodePort(nodePortAddressesIPv4, uint16(svcNodePort), binding.ProtocolTCP).Times(1) + mockRouteClient.EXPECT().AddNodePortConf(nodePortAddressesIPv4, uint16(svcNodePort), binding.ProtocolTCP).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), gomock.Any(), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(2), gomock.Any(), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any()).Times(1) diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index 5355efbf3c6..45e3c62d1a3 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -76,11 +76,23 @@ type Interface interface { // DeleteEgressRule deletes the IP rule installed by AddEgressRule. DeleteEgressRule(tableID uint32, mark uint32) error - // AddNodePort adds configurations when a NodePort Service is created. - AddNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error + // AddNodePortConf adds configurations when a NodePort Service is created. + AddNodePortConf(nodePortIPs []net.IP, port uint16, protocol binding.Protocol) error - // DeleteNodePort deletes related configurations when a NodePort Service is deleted. - DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error + // DeleteNodePortConf deletes related configurations when a NodePort Service is deleted. + DeleteNodePortConf(nodePortIPs []net.IP, port uint16, protocol binding.Protocol) error + + // AddLoadBalancerConf adds configurations when a LoadBalancer Service is created. + AddLoadBalancerConf(loadBalancerIPs []net.IP, port uint16, protocol binding.Protocol) error + + // DeleteLoadBalancerConf deletes related configurations when a LoadBalancer Service is deleted. + DeleteLoadBalancerConf(loadBalancerIPs []net.IP, port uint16, protocol binding.Protocol) error + + // AddExternalIPConf adds configurations when an externalIP is created. + AddExternalIPConf(externalIPs []net.IP, port uint16, protocol binding.Protocol) error + + // DeleteExternalIPConf deletes related configurations when an externalIP is deleted. + DeleteExternalIPConf(externalIPs []net.IP, port uint16, protocol binding.Protocol) error // AddExternalIPRoute adds a route entry when an external IP is added. AddExternalIPRoute(externalIP net.IP) error diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index ac61dcd2d9d..ff60e28d6a1 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -21,12 +21,14 @@ import ( "reflect" "sort" "strconv" + "strings" "sync" "time" "github.com/containernetworking/plugins/pkg/ip" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" @@ -56,7 +58,7 @@ const ( // antreaPodIP6Set contains all Per-Node IPAM IPv6 Pod CIDRs of this cluster. antreaPodIP6Set = "ANTREA-POD-IP6" - // Antrea managed ipset. Max name length is 31 chars. + // Antrea managed ipsets. Max name length is 31 chars. // localAntreaFlexibleIPAMPodIPSet contains all AntreaFlexibleIPAM Pod IPs of this Node. localAntreaFlexibleIPAMPodIPSet = "LOCAL-FLEXIBLE-IPAM-POD-IP" // localAntreaFlexibleIPAMPodIP6Set contains all AntreaFlexibleIPAM Pod IPv6s of this Node. @@ -66,9 +68,13 @@ const ( // clusterNodeIP6Set contains all other Node IP6s in the cluster. clusterNodeIP6Set = "CLUSTER-NODE-IP6" - // Antrea proxy NodePort IP - antreaNodePortIPSet = "ANTREA-NODEPORT-IP" - antreaNodePortIP6Set = "ANTREA-NODEPORT-IP6" + // Antrea managed ipsets for Service. + antreaNodePortIPSet = "ANTREA-NODEPORT-IP" + antreaNodePortIP6Set = "ANTREA-NODEPORT-IP6" + antreaLoadBalancerIPSet = "ANTREA-LOADBALANCER-IP" + antreaLoadBalancerIP6Set = "ANTREA-LOADBALANCER-IP6" + antreaExternalIPIPSet = "ANTREA-EXTERNAL-IP" + antreaExternalIPIP6Set = "ANTREA-EXTERNAL-IP6" // Antrea managed iptables chains. antreaForwardChain = "ANTREA-FORWARD" @@ -78,6 +84,8 @@ const ( antreaOutputChain = "ANTREA-OUTPUT" antreaMangleChain = "ANTREA-MANGLE" + kubeProxyServiceChain = "KUBE-SERVICES" + serviceIPv4CIDRKey = "serviceIPv4CIDRKey" serviceIPv6CIDRKey = "serviceIPv6CIDRKey" @@ -115,6 +123,8 @@ type Client struct { // iptablesInitialized is used to notify when iptables initialization is done. iptablesInitialized chan struct{} proxyAll bool + kubeServiceHost string + kubeServicePort intstr.IntOrString connectUplinkToBridge bool multicastEnabled bool isCloudEKS bool @@ -123,10 +133,8 @@ type Client struct { serviceRoutes sync.Map // serviceNeighbors caches neighbors. serviceNeighbors sync.Map - // nodePortsIPv4 caches all existing IPv4 NodePorts. - nodePortsIPv4 sync.Map - // nodePortsIPv6 caches all existing IPv6 NodePorts. - nodePortsIPv6 sync.Map + // serviceIPSets caches ipsets about Services. + serviceIPSets map[string]*sync.Map // clusterNodeIPs stores the IPv4 of all other Nodes in the cluster clusterNodeIPs sync.Map // clusterNodeIP6s stores the IPv6 address of all other Nodes in the cluster. It is maintained but not consumed @@ -146,7 +154,7 @@ type Client struct { nodeNetworkPolicyIPTablesIPv6 sync.Map // deterministic represents whether to write iptables chains and rules for NodeNetworkPolicy deterministically when // syncIPTables is called. Enabling it may carry a performance impact. It's disabled by default and should only be - // used in testing. + // used in testing.ยท deterministic bool } @@ -154,11 +162,12 @@ type Client struct { func NewClient(networkConfig *config.NetworkConfig, noSNAT bool, proxyAll bool, + isKubeAPIServerOverridden bool, connectUplinkToBridge bool, nodeNetworkPolicyEnabled bool, multicastEnabled bool, serviceCIDRProvider servicecidr.Interface) (*Client, error) { - return &Client{ + c := &Client{ networkConfig: networkConfig, noSNAT: noSNAT, proxyAll: proxyAll, @@ -169,7 +178,24 @@ func NewClient(networkConfig *config.NetworkConfig, netlink: &netlink.Handle{}, isCloudEKS: env.IsCloudEKS(), serviceCIDRProvider: serviceCIDRProvider, - }, nil + serviceIPSets: map[string]*sync.Map{ + antreaNodePortIPSet: {}, + antreaNodePortIP6Set: {}, + antreaLoadBalancerIPSet: {}, + antreaLoadBalancerIP6Set: {}, + antreaExternalIPIPSet: {}, + antreaExternalIPIP6Set: {}, + }, + } + if proxyAll && !isKubeAPIServerOverridden { + c.kubeServiceHost = env.GetKubeServiceHost() + if env.GetKubeServicePort() == 0 { + return nil, fmt.Errorf("kube Service port is invalid") + } + c.kubeServicePort = intstr.FromInt32(env.GetKubeServicePort()) + } + + return c, nil } // Initialize initializes all infrastructures required to route container packets in host network. @@ -179,7 +205,7 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error { c.iptablesInitialized = make(chan struct{}) var err error - // Sets up the ipset that will be used in iptables. + // Sets up the ipsets that will be used in iptables. if err = c.syncIPSet(); err != nil { return fmt.Errorf("failed to initialize ipset: %v", err) } @@ -388,27 +414,19 @@ func (c *Client) syncIPSet() error { // AntreaProxy proxyAll is available in all traffic modes. If proxyAll is enabled, create the ipsets to store the // pairs of Node IP and NodePort. if c.proxyAll { - if err := c.ipset.CreateIPSet(antreaNodePortIPSet, ipset.HashIPPort, false); err != nil { - return err - } - if err := c.ipset.CreateIPSet(antreaNodePortIP6Set, ipset.HashIPPort, true); err != nil { - return err - } - - c.nodePortsIPv4.Range(func(k, _ interface{}) bool { - ipSetEntry := k.(string) - if err := c.ipset.AddEntry(antreaNodePortIPSet, ipSetEntry); err != nil { - return false - } - return true - }) - c.nodePortsIPv6.Range(func(k, _ interface{}) bool { - ipSetEntry := k.(string) - if err := c.ipset.AddEntry(antreaNodePortIP6Set, ipSetEntry); err != nil { - return false + for ipsetName, ipsetEntries := range c.serviceIPSets { + isIPv6 := isIP6Set(ipsetName) + if err := c.ipset.CreateIPSet(ipsetName, ipset.HashIPPort, isIPv6); err != nil { + return err } - return true - }) + ipsetEntries.Range(func(k, _ interface{}) bool { + ipsetEntry := k.(string) + if err := c.ipset.AddEntry(ipsetName, ipsetEntry); err != nil { + return false + } + return true + }) + } } // AntreaIPAM is available in noEncap mode. There is a validation in Antrea configuration about this traffic mode @@ -438,8 +456,8 @@ func (c *Client) syncIPSet() error { return true }) c.clusterNodeIP6s.Range(func(_, v interface{}) bool { - ipSetEntry := v.(string) - if err := c.ipset.AddEntry(clusterNodeIP6Set, ipSetEntry); err != nil { + ipsetEntry := v.(string) + if err := c.ipset.AddEntry(clusterNodeIP6Set, ipsetEntry); err != nil { return false } return true @@ -494,6 +512,22 @@ func getNodePortIPSetName(isIPv6 bool) string { } } +func getLoadBalancerIPSetName(isIPv6 bool) string { + if isIPv6 { + return antreaLoadBalancerIP6Set + } else { + return antreaLoadBalancerIPSet + } +} + +func getExternalIPIPSetName(isIPv6 bool) string { + if isIPv6 { + return antreaExternalIPIP6Set + } else { + return antreaExternalIPIPSet + } +} + func getLocalAntreaFlexibleIPAMPodIPSetName(isIPv6 bool) string { if isIPv6 { return localAntreaFlexibleIPAMPodIP6Set @@ -562,43 +596,108 @@ func (c *Client) writeEKSNATRules(iptablesData *bytes.Buffer) { }...) } +func (c *Client) getIPProtocol() iptables.Protocol { + switch { + case c.networkConfig.IPv4Enabled && c.networkConfig.IPv6Enabled: + return iptables.ProtocolDual + case c.networkConfig.IPv6Enabled: + return iptables.ProtocolIPv6 + default: + return iptables.ProtocolIPv4 + } +} + +// Create the antrea managed chains and link them to built-in chains. +// We cannot use iptables-restore for these jump rules because there +// are non antrea managed rules in built-in chains. +type jumpRule struct { + table string + srcChain string + dstChain string + comment string + insert bool +} + +func (c *Client) ensureAntreaJumpRulePrecedence(protocol iptables.Protocol, jumpRule jumpRule) error { + // List all the existing rules of the table and the chain where the Antrea jump rule will be added. + allExistingRules, err := c.iptables.ListRules(protocol, jumpRule.table, jumpRule.srcChain) + if err != nil { + return err + } + + // Construct keywords to identify Antrea and kube-proxy jump rules. + antreaJumpRuleKeyword := fmt.Sprintf("-j %s", jumpRule.dstChain) + kubeProxyJumpRuleKeyword := fmt.Sprintf("-j %s", kubeProxyServiceChain) + + for ipProtocol, rules := range allExistingRules { + var antreaJumpRuleIndex, kubeProxyJumpRuleIndex = -1, -1 + + for index, rule := range rules { + // Check if the current rule is the Antrea jump rule to be added. + if strings.Contains(rule, antreaJumpRuleKeyword) { + antreaJumpRuleIndex = index + // Check if the current rule is the kube-proxy jump rule. + } else if strings.Contains(rule, kubeProxyJumpRuleKeyword) { + kubeProxyJumpRuleIndex = index + } + } + // If both the Antrea jump rule and the kube-proxy jump rule are found and the Antrea jump rule is installed + // after the kube-proxy jump rule, delete the existing Antrea jump rule. + if antreaJumpRuleIndex != -1 && kubeProxyJumpRuleIndex != -1 && antreaJumpRuleIndex > kubeProxyJumpRuleIndex { + ruleSpec := []string{"-j", jumpRule.dstChain, "-m", "comment", "--comment", jumpRule.comment} + if err := c.iptables.DeleteRule(ipProtocol, jumpRule.table, jumpRule.srcChain, ruleSpec); err != nil { + return err + } + } + } + return nil +} + // syncIPTables ensure that the iptables infrastructure we use is set up. // It's idempotent and can safely be called on every startup. func (c *Client) syncIPTables() error { - // Create the antrea managed chains and link them to built-in chains. - // We cannot use iptables-restore for these jump rules because there - // are non antrea managed rules in built-in chains. - type jumpRule struct { - table string - srcChain string - dstChain string - comment string - } + ipProtocol := c.getIPProtocol() jumpRules := []jumpRule{ - {iptables.RawTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules"}, - {iptables.RawTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}, - {iptables.FilterTable, iptables.ForwardChain, antreaForwardChain, "Antrea: jump to Antrea forwarding rules"}, - {iptables.NATTable, iptables.PostRoutingChain, antreaPostRoutingChain, "Antrea: jump to Antrea postrouting rules"}, - {iptables.MangleTable, iptables.PreRoutingChain, antreaMangleChain, "Antrea: jump to Antrea mangle rules"}, // TODO: unify the chain naming style - {iptables.MangleTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}, + {iptables.RawTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules", false}, + {iptables.RawTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", false}, + {iptables.FilterTable, iptables.ForwardChain, antreaForwardChain, "Antrea: jump to Antrea forwarding rules", false}, + {iptables.NATTable, iptables.PostRoutingChain, antreaPostRoutingChain, "Antrea: jump to Antrea postrouting rules", false}, + {iptables.MangleTable, iptables.PreRoutingChain, antreaMangleChain, "Antrea: jump to Antrea mangle rules", false}, // TODO: unify the chain naming style + {iptables.MangleTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", false}, } if c.proxyAll || c.isCloudEKS { - jumpRules = append(jumpRules, jumpRule{iptables.NATTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules"}) + rule := jumpRule{iptables.NATTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules", c.proxyAll == true} + if c.proxyAll { + if err := c.ensureAntreaJumpRulePrecedence(ipProtocol, rule); err != nil { + return err + } + } + jumpRules = append(jumpRules, rule) } if c.proxyAll { - jumpRules = append(jumpRules, jumpRule{iptables.NATTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}) + rule := jumpRule{iptables.NATTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", true} + if err := c.ensureAntreaJumpRulePrecedence(ipProtocol, rule); err != nil { + return err + } + jumpRules = append(jumpRules, rule) } if c.nodeNetworkPolicyEnabled { - jumpRules = append(jumpRules, jumpRule{iptables.FilterTable, iptables.InputChain, antreaInputChain, "Antrea: jump to Antrea input rules"}) - jumpRules = append(jumpRules, jumpRule{iptables.FilterTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}) + jumpRules = append(jumpRules, jumpRule{iptables.FilterTable, iptables.InputChain, antreaInputChain, "Antrea: jump to Antrea input rules", false}) + jumpRules = append(jumpRules, jumpRule{iptables.FilterTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", false}) } for _, rule := range jumpRules { - if err := c.iptables.EnsureChain(iptables.ProtocolDual, rule.table, rule.dstChain); err != nil { + if err := c.iptables.EnsureChain(ipProtocol, rule.table, rule.dstChain); err != nil { return err } ruleSpec := []string{"-j", rule.dstChain, "-m", "comment", "--comment", rule.comment} - if err := c.iptables.AppendRule(iptables.ProtocolDual, rule.table, rule.srcChain, ruleSpec); err != nil { - return err + if rule.insert { + if err := c.iptables.InsertRule(ipProtocol, rule.table, rule.srcChain, ruleSpec); err != nil { + return err + } + } else { + if err := c.iptables.AppendRule(ipProtocol, rule.table, rule.srcChain, ruleSpec); err != nil { + return err + } } } @@ -630,16 +729,48 @@ func (c *Client) syncIPTables() error { return true }) + var proxyAllIPTablesIPv4, proxyAllIPTablesIPv6 map[string]map[string][]string + if c.proxyAll { + if c.networkConfig.IPv4Enabled { + serviceIPv4CIDR, err := c.getServiceCIDR(false) + if err != nil { + return err + } + proxyAllIPTablesIPv4 = c.generateProxyAllRules( + antreaNodePortIPSet, + antreaLoadBalancerIPSet, + antreaExternalIPIPSet, + serviceIPv4CIDR, + config.VirtualNodePortDNATIPv4.String(), + config.VirtualServiceIPv4.String(), + false, + ) + } + if c.networkConfig.IPv6Enabled { + serviceIPv6CIDR, err := c.getServiceCIDR(true) + if err != nil { + return err + } + proxyAllIPTablesIPv6 = c.generateProxyAllRules( + antreaNodePortIP6Set, + antreaLoadBalancerIP6Set, + antreaExternalIPIP6Set, + serviceIPv6CIDR, + config.VirtualNodePortDNATIPv6.String(), + config.VirtualServiceIPv6.String(), + true, + ) + } + } + // Use iptables-restore to configure IPv4 settings. if c.networkConfig.IPv4Enabled { iptablesData := c.restoreIptablesData(c.nodeConfig.PodIPv4CIDR, antreaPodIPSet, localAntreaFlexibleIPAMPodIPSet, - antreaNodePortIPSet, clusterNodeIPSet, - config.VirtualNodePortDNATIPv4, - config.VirtualServiceIPv4, snatMarkToIPv4, + proxyAllIPTablesIPv4, nodeNetworkPolicyIPTablesIPv4, false) @@ -654,11 +785,9 @@ func (c *Client) syncIPTables() error { iptablesData := c.restoreIptablesData(c.nodeConfig.PodIPv6CIDR, antreaPodIP6Set, localAntreaFlexibleIPAMPodIP6Set, - antreaNodePortIP6Set, clusterNodeIP6Set, - config.VirtualNodePortDNATIPv6, - config.VirtualServiceIPv6, snatMarkToIPv6, + proxyAllIPTablesIPv6, nodeNetworkPolicyIPTablesIPv6, true) // Setting --noflush to keep the previous contents (i.e. non antrea managed chains) of the tables. @@ -670,14 +799,148 @@ func (c *Client) syncIPTables() error { return nil } +func (c *Client) generateProxyAllRules(nodePortIPSet string, + loadBalancerIPSet string, + externalIPIPSet string, + serviceCIDR string, + nodePortDNATVirtualIP string, + serviceVirtualIP string, + isIPv6 bool) map[string]map[string][]string { + rules := map[string]map[string][]string{ + iptables.NATTable: make(map[string][]string), + iptables.RawTable: make(map[string][]string), + } + var natPreroutingRules, natOutputRules []string + if c.kubeServiceHost != "" { + if isIPv6 && utilnet.IsIPv6String(c.kubeServiceHost) || !isIPv6 && utilnet.IsIPv4String(c.kubeServiceHost) { + natPreroutingRules = append(natPreroutingRules, + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: bypass external to kube Service traffic when kube Service Endpoint is not override"). + MatchCIDRDst(c.kubeServiceHost). + MatchTransProtocol(iptables.ProtocolTCP). + MatchPortDst(&c.kubeServicePort, nil). + SetTarget(kubeProxyServiceChain). + Done(). + GetRule()) + natOutputRules = append(natOutputRules, + iptables.NewRuleBuilder(antreaOutputChain). + SetComment("Antrea: bypass local to kube Service traffic when kube Service Endpoint is not override"). + MatchCIDRDst(c.kubeServiceHost). + MatchTransProtocol(iptables.ProtocolTCP). + MatchPortDst(&c.kubeServicePort, nil). + SetTarget(kubeProxyServiceChain). + Done(). + GetRule()) + } + } + if serviceCIDR != "" { + natPreroutingRules = append(natPreroutingRules, + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: accept external to ClusterIP packets"). + MatchCIDRDst(serviceCIDR). + SetTarget(iptables.AcceptTarget). + Done(). + GetRule()) + natOutputRules = append(natOutputRules, + iptables.NewRuleBuilder(antreaOutputChain). + SetComment("Antrea: accept local to ClusterIP packets"). + MatchCIDRDst(serviceCIDR). + SetTarget(iptables.AcceptTarget). + Done(). + GetRule()) + } + natPreroutingRules = append(natPreroutingRules, + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: DNAT external to NodePort packets"). + MatchIPSetDst(nodePortIPSet, ipset.HashIPPort). + SetTarget(iptables.DNATTarget). + SetTargetDNATToDst(nodePortDNATVirtualIP, nil). + Done(). + GetRule(), + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: accept external to LoadBalancer packets"). + MatchIPSetDst(loadBalancerIPSet, ipset.HashIPPort). + SetTarget(iptables.AcceptTarget). + Done(). + GetRule(), + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: accept external to externalIP packets"). + MatchIPSetDst(externalIPIPSet, ipset.HashIPPort). + SetTarget(iptables.AcceptTarget). + Done(). + GetRule(), + ) + natOutputRules = append(natOutputRules, + iptables.NewRuleBuilder(antreaOutputChain). + SetComment("Antrea: DNAT local to NodePort packets"). + MatchIPSetDst(nodePortIPSet, ipset.HashIPPort). + SetTarget(iptables.DNATTarget). + SetTargetDNATToDst(nodePortDNATVirtualIP, nil). + Done(). + GetRule(), + iptables.NewRuleBuilder(antreaOutputChain). + SetComment("Antrea: accept local to LoadBalancer packets"). + MatchIPSetDst(loadBalancerIPSet, ipset.HashIPPort). + SetTarget(iptables.AcceptTarget). + Done(). + GetRule(), + iptables.NewRuleBuilder(antreaOutputChain). + SetComment("Antrea: accept local to externalIP packets"). + MatchIPSetDst(externalIPIPSet, ipset.HashIPPort). + SetTarget(iptables.AcceptTarget). + Done(). + GetRule(), + ) + + rawPreroutingRules := []string{ + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: do not track external to LoadBalancer request packets"). + MatchIPSetDst(loadBalancerIPSet, ipset.HashIPPort). + SetTarget(iptables.NotrackTarget). + Done(). + GetRule(), + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: do not track external to externalIP request packets"). + MatchIPSetDst(externalIPIPSet, ipset.HashIPPort). + SetTarget(iptables.NotrackTarget). + Done(). + GetRule(), + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: do not track external to LoadBalancer reply packets"). + MatchIPSetSrc(loadBalancerIPSet, ipset.HashIPPort). + SetTarget(iptables.NotrackTarget). + Done(). + GetRule(), + iptables.NewRuleBuilder(antreaPreRoutingChain). + SetComment("Antrea: do not track external to externalIP reply packets"). + MatchIPSetSrc(externalIPIPSet, ipset.HashIPPort). + SetTarget(iptables.NotrackTarget). + Done(). + GetRule(), + } + natPostRoutingRules := []string{ + iptables.NewRuleBuilder(antreaPostRoutingChain). + SetComment("Antrea: masquerade OVS virtual source IP"). + MatchCIDRSrc(serviceVirtualIP). + SetTarget(iptables.MasqueradeTarget). + Done(). + GetRule(), + } + + rules[iptables.NATTable][antreaPreRoutingChain] = natPreroutingRules + rules[iptables.NATTable][antreaOutputChain] = natOutputRules + rules[iptables.NATTable][antreaPostRoutingChain] = natPostRoutingRules + rules[iptables.RawTable][antreaPreRoutingChain] = rawPreroutingRules + + return rules +} + func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet, localAntreaFlexibleIPAMPodIPSet, - nodePortIPSet, clusterNodeIPSet string, - nodePortDNATVirtualIP, - serviceVirtualIP net.IP, snatMarkToIP map[uint32]net.IP, + proxyAllIPTables map[string]map[string][]string, nodeNetWorkPolicyIPTables map[string][]string, isIPv6 bool) *bytes.Buffer { // Create required rules in the antrea chains. @@ -731,6 +994,11 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, }...) } } + if c.proxyAll { + for _, ruleStr := range proxyAllIPTables[iptables.RawTable][antreaPreRoutingChain] { + writeLine(iptablesData, ruleStr) + } + } writeLine(iptablesData, "COMMIT") // Write head lines anyway so the undesired rules can be deleted when noEncap -> encap. @@ -822,21 +1090,13 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, writeLine(iptablesData, iptables.MakeChainLine(antreaPreRoutingChain)) } if c.proxyAll { - writeLine(iptablesData, []string{ - "-A", antreaPreRoutingChain, - "-m", "comment", "--comment", `"Antrea: DNAT external to NodePort packets"`, - "-m", "set", "--match-set", nodePortIPSet, "dst,dst", - "-j", iptables.DNATTarget, - "--to-destination", nodePortDNATVirtualIP.String(), - }...) + for _, ruleStr := range proxyAllIPTables[iptables.NATTable][antreaPreRoutingChain] { + writeLine(iptablesData, ruleStr) + } writeLine(iptablesData, iptables.MakeChainLine(antreaOutputChain)) - writeLine(iptablesData, []string{ - "-A", antreaOutputChain, - "-m", "comment", "--comment", `"Antrea: DNAT local to NodePort packets"`, - "-m", "set", "--match-set", nodePortIPSet, "dst,dst", - "-j", iptables.DNATTarget, - "--to-destination", nodePortDNATVirtualIP.String(), - }...) + for _, ruleStr := range proxyAllIPTables[iptables.NATTable][antreaOutputChain] { + writeLine(iptablesData, ruleStr) + } } writeLine(iptablesData, iptables.MakeChainLine(antreaPostRoutingChain)) // The masqueraded multicast traffic will become unicast so we @@ -890,12 +1150,9 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, // If AntreaProxy full support is enabled, it SNATs the packets whose source IP is VirtualServiceIPv4/VirtualServiceIPv6 // so the packets can be routed back to this Node. if c.proxyAll { - writeLine(iptablesData, []string{ - "-A", antreaPostRoutingChain, - "-m", "comment", "--comment", `"Antrea: masquerade OVS virtual source IP"`, - "-s", serviceVirtualIP.String(), - "-j", iptables.MasqueradeTarget, - }...) + for _, ruleStr := range proxyAllIPTables[iptables.NATTable][antreaPostRoutingChain] { + writeLine(iptablesData, ruleStr) + } } // This generates the rule to masquerade the packets destined for a hostPort whose backend is an AntreaIPAM VLAN Pod. @@ -1614,50 +1871,77 @@ func (c *Client) addVirtualServiceIPRoute(isIPv6 bool) error { return nil } -// AddNodePort is used to add IP,port:protocol entries to target ip set when a NodePort Service is added. An entry is added -// for every NodePort IP. -func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { - isIPv6 := isIPv6Protocol(protocol) +func (c *Client) addServiceIPSetEntries(ips []net.IP, port uint16, protocol binding.Protocol, ipsetName string) error { transProtocol := getTransProtocolStr(protocol) - ipSetName := getNodePortIPSetName(isIPv6) - - for i := range nodePortAddresses { - ipSetEntry := fmt.Sprintf("%s,%s:%d", nodePortAddresses[i], transProtocol, port) - if err := c.ipset.AddEntry(ipSetName, ipSetEntry); err != nil { + for i := range ips { + ipsetEntry := fmt.Sprintf("%s,%s:%d", ips[i].String(), transProtocol, port) + if err := c.ipset.AddEntry(ipsetName, ipsetEntry); err != nil { return err } - if isIPv6 { - c.nodePortsIPv6.Store(ipSetEntry, struct{}{}) - } else { - c.nodePortsIPv4.Store(ipSetEntry, struct{}{}) - } - klog.V(4).InfoS("Added ipset for NodePort", "IP", nodePortAddresses[i], "Port", port, "Protocol", protocol) + c.serviceIPSets[ipsetName].Store(ipsetEntry, struct{}{}) + klog.V(4).InfoS("Added ipset entry for Service", "IP", ips[i], "Port", port, "Protocol", protocol) } return nil } -// DeleteNodePort is used to delete related IP set entries when a NodePort Service is deleted. -func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { - isIPv6 := isIPv6Protocol(protocol) +func (c *Client) deleteServiceIPSetEntries(ips []net.IP, port uint16, protocol binding.Protocol, ipsetName string) error { transProtocol := getTransProtocolStr(protocol) - ipSetName := getNodePortIPSetName(isIPv6) - - for i := range nodePortAddresses { - ipSetEntry := fmt.Sprintf("%s,%s:%d", nodePortAddresses[i], transProtocol, port) - if err := c.ipset.DelEntry(ipSetName, ipSetEntry); err != nil { + for i := range ips { + ipsetEntry := fmt.Sprintf("%s,%s:%d", ips[i].String(), transProtocol, port) + if err := c.ipset.DelEntry(ipsetName, ipsetEntry); err != nil { return err } - if isIPv6 { - c.nodePortsIPv6.Delete(ipSetEntry) - } else { - c.nodePortsIPv4.Delete(ipSetEntry) - } + c.serviceIPSets[ipsetName].Delete(ipsetEntry) } - return nil } +// AddNodePortConf is used to add IP,protocol:port entries to target ipset when a NodePort Service is added. An entry is +// added for every NodePort IP. +func (c *Client) AddNodePortConf(nodePortIPs []net.IP, port uint16, protocol binding.Protocol) error { + isIPv6 := isIPv6Protocol(protocol) + ipsetName := getNodePortIPSetName(isIPv6) + return c.addServiceIPSetEntries(nodePortIPs, port, protocol, ipsetName) +} + +// DeleteNodePortConf is used to delete related ipset entries when a NodePort Service is deleted. +func (c *Client) DeleteNodePortConf(nodePortIPs []net.IP, port uint16, protocol binding.Protocol) error { + isIPv6 := isIPv6Protocol(protocol) + ipsetName := getNodePortIPSetName(isIPv6) + return c.deleteServiceIPSetEntries(nodePortIPs, port, protocol, ipsetName) +} + +// AddLoadBalancerConf is used to add IP,protocol:port entries to target ipset when a LoadBalancer Service is added. An +// entry is added for every LoadBalancer IP. +func (c *Client) AddLoadBalancerConf(loadBalancerIPs []net.IP, port uint16, protocol binding.Protocol) error { + isIPv6 := isIPv6Protocol(protocol) + ipsetName := getLoadBalancerIPSetName(isIPv6) + return c.addServiceIPSetEntries(loadBalancerIPs, port, protocol, ipsetName) +} + +// DeleteLoadBalancerConf is used to delete related ipset entries when a LoadBalancer Service is deleted. +func (c *Client) DeleteLoadBalancerConf(loadBalancerIPs []net.IP, port uint16, protocol binding.Protocol) error { + isIPv6 := isIPv6Protocol(protocol) + ipsetName := getLoadBalancerIPSetName(isIPv6) + return c.deleteServiceIPSetEntries(loadBalancerIPs, port, protocol, ipsetName) +} + +// AddExternalIPConf is used to add IP,protocol:port entries to target ipset when a Service with externalIPs is added. +// An entry is added for every externalIP. +func (c *Client) AddExternalIPConf(externalIPs []net.IP, port uint16, protocol binding.Protocol) error { + isIPv6 := isIPv6Protocol(protocol) + ipsetName := getExternalIPIPSetName(isIPv6) + return c.addServiceIPSetEntries(externalIPs, port, protocol, ipsetName) +} + +// DeleteExternalIPConf is used to delete related ipset entries when a Service with externalIPs is deleted. +func (c *Client) DeleteExternalIPConf(externalIPs []net.IP, port uint16, protocol binding.Protocol) error { + isIPv6 := isIPv6Protocol(protocol) + ipsetName := getExternalIPIPSetName(isIPv6) + return c.deleteServiceIPSetEntries(externalIPs, port, protocol, ipsetName) +} + func (c *Client) addServiceCIDRRoute(serviceCIDR *net.IPNet) error { isIPv6 := utilnet.IsIPv6(serviceCIDR.IP) linkIndex := c.nodeConfig.GatewayConfig.LinkIndex @@ -1814,9 +2098,9 @@ func (c *Client) AddLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error continue } } - ipSetEntry := podAddresses[i].String() + ipsetEntry := podAddresses[i].String() ipSetName := getLocalAntreaFlexibleIPAMPodIPSetName(isIPv6) - if err := c.ipset.AddEntry(ipSetName, ipSetEntry); err != nil { + if err := c.ipset.AddEntry(ipSetName, ipsetEntry); err != nil { return err } } @@ -1830,9 +2114,9 @@ func (c *Client) DeleteLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) err } for i := range podAddresses { isIPv6 := podAddresses[i].To4() == nil - ipSetEntry := podAddresses[i].String() + ipsetEntry := podAddresses[i].String() ipSetName := getLocalAntreaFlexibleIPAMPodIPSetName(isIPv6) - if err := c.ipset.DelEntry(ipSetName, ipSetEntry); err != nil { + if err := c.ipset.DelEntry(ipSetName, ipsetEntry); err != nil { return err } } @@ -1848,17 +2132,17 @@ func (c *Client) addNodeIP(podCIDR *net.IPNet, nodeIP net.IP) error { if nodeIP == nil { return nil } - ipSetEntry := nodeIP.String() + ipsetEntry := nodeIP.String() if nodeIP.To4() != nil { - if err := c.ipset.AddEntry(clusterNodeIPSet, ipSetEntry); err != nil { + if err := c.ipset.AddEntry(clusterNodeIPSet, ipsetEntry); err != nil { return err } - c.clusterNodeIPs.Store(podCIDR.String(), ipSetEntry) + c.clusterNodeIPs.Store(podCIDR.String(), ipsetEntry) } else { - if err := c.ipset.AddEntry(clusterNodeIP6Set, ipSetEntry); err != nil { + if err := c.ipset.AddEntry(clusterNodeIP6Set, ipsetEntry); err != nil { return err } - c.clusterNodeIP6s.Store(podCIDR.String(), ipSetEntry) + c.clusterNodeIP6s.Store(podCIDR.String(), ipsetEntry) } return nil } @@ -1876,8 +2160,8 @@ func (c *Client) deleteNodeIP(podCIDR *net.IPNet) error { if !exists { return nil } - ipSetEntry := obj.(string) - if err := c.ipset.DelEntry(clusterNodeIPSet, ipSetEntry); err != nil { + ipsetEntry := obj.(string) + if err := c.ipset.DelEntry(clusterNodeIPSet, ipsetEntry); err != nil { return err } c.clusterNodeIPs.Delete(podCIDRStr) @@ -1886,8 +2170,8 @@ func (c *Client) deleteNodeIP(podCIDR *net.IPNet) error { if !exists { return nil } - ipSetEntry := obj.(string) - if err := c.ipset.DelEntry(clusterNodeIP6Set, ipSetEntry); err != nil { + ipsetEntry := obj.(string) + if err := c.ipset.DelEntry(clusterNodeIP6Set, ipsetEntry); err != nil { return err } c.clusterNodeIP6s.Delete(podCIDRStr) @@ -2116,3 +2400,21 @@ func (c *Client) DeleteNodeNetworkPolicyIPTables(iptablesChains []string, isIPv6 return nil } + +func (c *Client) getServiceCIDR(isIPv6 bool) (string, error) { + var serviceCIDR string + serviceCIDRs, err := c.serviceCIDRProvider.GetServiceCIDRs() + if err != nil { + return "", fmt.Errorf("failed to get Service CIDRs from serviceCIDRProvider: %w", err) + } + for _, cidr := range serviceCIDRs { + if !isIPv6 && utilnet.IsIPv4CIDR(cidr) || isIPv6 && utilnet.IsIPv6CIDR(cidr) { + serviceCIDR = cidr.String() + } + } + return serviceCIDR, nil +} + +func isIP6Set(ipsetName string) bool { + return strings.Contains(ipsetName, "IP6") +} diff --git a/pkg/agent/route/route_linux_test.go b/pkg/agent/route/route_linux_test.go index 5104f81c587..181fed747d5 100644 --- a/pkg/agent/route/route_linux_test.go +++ b/pkg/agent/route/route_linux_test.go @@ -24,7 +24,9 @@ import ( "github.com/vishvananda/netlink" "go.uber.org/mock/gomock" "golang.org/x/sys/unix" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + utilnet "k8s.io/utils/net" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow" @@ -52,6 +54,15 @@ var ( ipv4Route2 = generateRoute(net.ParseIP(externalIPv4Addr2), 32, config.VirtualServiceIPv4, 10, netlink.SCOPE_UNIVERSE) ipv6Route1 = generateRoute(net.ParseIP(externalIPv6Addr1), 128, config.VirtualServiceIPv6, 10, netlink.SCOPE_UNIVERSE) ipv6Route2 = generateRoute(net.ParseIP(externalIPv6Addr2), 128, config.VirtualServiceIPv6, 10, netlink.SCOPE_UNIVERSE) + + serviceIPSets = map[string]*sync.Map{ + antreaNodePortIPSet: {}, + antreaNodePortIP6Set: {}, + antreaLoadBalancerIPSet: {}, + antreaLoadBalancerIP6Set: {}, + antreaExternalIPIPSet: {}, + antreaExternalIPIP6Set: {}, + } ) func TestSyncRoutes(t *testing.T) { @@ -157,8 +168,7 @@ func TestSyncIPSet(t *testing.T) { nodeNetworkPolicyEnabled bool networkConfig *config.NetworkConfig nodeConfig *config.NodeConfig - nodePortsIPv4 []string - nodePortsIPv6 []string + serviceIPSets map[string][]string clusterNodeIPs map[string]string clusterNodeIP6s map[string]string nodeNetworkPolicyIPSetsIPv4 map[string]sets.Set[string] @@ -204,8 +214,14 @@ func TestSyncIPSet(t *testing.T) { PodIPv4CIDR: podCIDR, PodIPv6CIDR: podCIDRv6, }, - nodePortsIPv4: []string{"192.168.0.2,tcp:10000", "127.0.0.1,tcp:10000"}, - nodePortsIPv6: []string{"fe80::e643:4bff:fe44:ee,tcp:10000", "::1,tcp:10000"}, + serviceIPSets: map[string][]string{ + antreaNodePortIPSet: {"192.168.0.2,tcp:10000", "127.0.0.1,tcp:10000"}, + antreaNodePortIP6Set: {"fe80::e643:4bff:fe44:ee,tcp:10000", "::1,tcp:10000"}, + antreaLoadBalancerIPSet: {"192.168.0.150,tcp:80", "192.168.0.151,tcp:443"}, + antreaLoadBalancerIP6Set: {"2001::192:168:0:150,tcp:80", "2001::192:168:0:151,tcp:443"}, + antreaExternalIPIPSet: {"192.168.0.200,tcp:80", "192.168.0.201,tcp:443"}, + antreaExternalIPIP6Set: {"2001::192:168:0:200,tcp:80", "2001::192:168:0:201,tcp:443"}, + }, clusterNodeIPs: map[string]string{"172.16.3.0/24": "192.168.0.3", "172.16.4.0/24": "192.168.0.4"}, clusterNodeIP6s: map[string]string{"2001:ab03:cd04:5503::/64": "fe80::e643:4bff:fe03", "2001:ab03:cd04:5504::/64": "fe80::e643:4bff:fe04"}, nodeNetworkPolicyIPSetsIPv4: map[string]sets.Set[string]{"ANTREA-POL-RULE1-4": sets.New[string]("1.1.1.1/32", "2.2.2.2/32")}, @@ -215,18 +231,35 @@ func TestSyncIPSet(t *testing.T) { mockIPSet.CreateIPSet(antreaPodIP6Set, ipset.HashNet, true) mockIPSet.AddEntry(antreaPodIPSet, podCIDRStr) mockIPSet.AddEntry(antreaPodIP6Set, podCIDRv6Str) + mockIPSet.CreateIPSet(antreaNodePortIPSet, ipset.HashIPPort, false) mockIPSet.CreateIPSet(antreaNodePortIP6Set, ipset.HashIPPort, true) mockIPSet.AddEntry(antreaNodePortIPSet, "192.168.0.2,tcp:10000") mockIPSet.AddEntry(antreaNodePortIPSet, "127.0.0.1,tcp:10000") mockIPSet.AddEntry(antreaNodePortIP6Set, "fe80::e643:4bff:fe44:ee,tcp:10000") mockIPSet.AddEntry(antreaNodePortIP6Set, "::1,tcp:10000") + + mockIPSet.CreateIPSet(antreaLoadBalancerIPSet, ipset.HashIPPort, false) + mockIPSet.CreateIPSet(antreaLoadBalancerIP6Set, ipset.HashIPPort, true) + mockIPSet.AddEntry(antreaLoadBalancerIPSet, "192.168.0.150,tcp:80") + mockIPSet.AddEntry(antreaLoadBalancerIPSet, "192.168.0.151,tcp:443") + mockIPSet.AddEntry(antreaLoadBalancerIP6Set, "2001::192:168:0:150,tcp:80") + mockIPSet.AddEntry(antreaLoadBalancerIP6Set, "2001::192:168:0:151,tcp:443") + + mockIPSet.CreateIPSet(antreaExternalIPIPSet, ipset.HashIPPort, false) + mockIPSet.CreateIPSet(antreaExternalIPIP6Set, ipset.HashIPPort, true) + mockIPSet.AddEntry(antreaExternalIPIPSet, "192.168.0.200,tcp:80") + mockIPSet.AddEntry(antreaExternalIPIPSet, "192.168.0.201,tcp:443") + mockIPSet.AddEntry(antreaExternalIPIP6Set, "2001::192:168:0:200,tcp:80") + mockIPSet.AddEntry(antreaExternalIPIP6Set, "2001::192:168:0:201,tcp:443") + mockIPSet.CreateIPSet(clusterNodeIPSet, ipset.HashIP, false) mockIPSet.CreateIPSet(clusterNodeIP6Set, ipset.HashIP, true) mockIPSet.AddEntry(clusterNodeIPSet, "192.168.0.3") mockIPSet.AddEntry(clusterNodeIPSet, "192.168.0.4") mockIPSet.AddEntry(clusterNodeIP6Set, "fe80::e643:4bff:fe03") mockIPSet.AddEntry(clusterNodeIP6Set, "fe80::e643:4bff:fe04") + mockIPSet.CreateIPSet("ANTREA-POL-RULE1-4", ipset.HashNet, false) mockIPSet.CreateIPSet("ANTREA-POL-RULE1-6", ipset.HashNet, true) mockIPSet.AddEntry("ANTREA-POL-RULE1-4", "1.1.1.1/32") @@ -268,16 +301,15 @@ func TestSyncIPSet(t *testing.T) { multicastEnabled: tt.multicastEnabled, connectUplinkToBridge: tt.connectUplinkToBridge, nodeNetworkPolicyEnabled: tt.nodeNetworkPolicyEnabled, - nodePortsIPv4: sync.Map{}, - nodePortsIPv6: sync.Map{}, clusterNodeIPs: sync.Map{}, clusterNodeIP6s: sync.Map{}, + serviceIPSets: make(map[string]*sync.Map), } - for _, nodePortIPv4 := range tt.nodePortsIPv4 { - c.nodePortsIPv4.Store(nodePortIPv4, struct{}{}) - } - for _, nodePortIPv6 := range tt.nodePortsIPv6 { - c.nodePortsIPv6.Store(nodePortIPv6, struct{}{}) + for ipsetName, ipsetEntries := range tt.serviceIPSets { + c.serviceIPSets[ipsetName] = &sync.Map{} + for _, entry := range ipsetEntries { + c.serviceIPSets[ipsetName].Store(entry, struct{}{}) + } } for cidr, nodeIP := range tt.clusterNodeIPs { c.clusterNodeIPs.Store(cidr, nodeIP) @@ -307,8 +339,6 @@ func TestSyncIPTables(t *testing.T) { nodeNetworkPolicyEnabled bool networkConfig *config.NetworkConfig nodeConfig *config.NodeConfig - nodePortsIPv4 []string - nodePortsIPv6 []string markToSNATIP map[uint32]string expectedCalls func(iptables *iptablestest.MockInterfaceMockRecorder) }{ @@ -347,10 +377,34 @@ func TestSyncIPTables(t *testing.T) { mockIPTables.AppendRule(iptables.ProtocolDual, iptables.MangleTable, iptables.PreRoutingChain, []string{"-j", antreaMangleChain, "-m", "comment", "--comment", "Antrea: jump to Antrea mangle rules"}) mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.MangleTable, antreaOutputChain) mockIPTables.AppendRule(iptables.ProtocolDual, iptables.MangleTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) + mockIPTables.ListRules(iptables.ProtocolDual, iptables.NATTable, iptables.PreRoutingChain).Return( + map[iptables.Protocol][]string{ + iptables.ProtocolIPv4: { + "-A " + iptables.PreRoutingChain + " -j " + kubeProxyServiceChain, + "-A " + iptables.PreRoutingChain + " -j " + antreaPreRoutingChain, + }, + iptables.ProtocolIPv6: { + "-A " + iptables.PreRoutingChain + " -j " + antreaPreRoutingChain, + "-A " + iptables.PreRoutingChain + " -j " + kubeProxyServiceChain, + }, + }, nil) + mockIPTables.DeleteRule(iptables.ProtocolIPv4, iptables.NATTable, iptables.PreRoutingChain, []string{"-j", antreaPreRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea prerouting rules"}) + mockIPTables.ListRules(iptables.ProtocolDual, iptables.NATTable, iptables.OutputChain).Return( + map[iptables.Protocol][]string{ + iptables.ProtocolIPv4: { + "-A " + iptables.OutputChain + " -j " + antreaOutputChain, + "-A " + iptables.OutputChain + " -j " + kubeProxyServiceChain, + }, + iptables.ProtocolIPv6: { + "-A " + iptables.OutputChain + " -j " + kubeProxyServiceChain, + "-A " + iptables.OutputChain + " -j " + antreaOutputChain, + }, + }, nil) + mockIPTables.DeleteRule(iptables.ProtocolIPv6, iptables.NATTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.NATTable, antreaPreRoutingChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.NATTable, iptables.PreRoutingChain, []string{"-j", antreaPreRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea prerouting rules"}) + mockIPTables.InsertRule(iptables.ProtocolDual, iptables.NATTable, iptables.PreRoutingChain, []string{"-j", antreaPreRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea prerouting rules"}) mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.NATTable, antreaOutputChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.NATTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) + mockIPTables.InsertRule(iptables.ProtocolDual, iptables.NATTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.FilterTable, antreaInputChain) mockIPTables.AppendRule(iptables.ProtocolDual, iptables.FilterTable, iptables.InputChain, []string{"-j", antreaInputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea input rules"}) mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.FilterTable, antreaOutputChain) @@ -361,6 +415,10 @@ func TestSyncIPTables(t *testing.T) { -A ANTREA-PREROUTING -m comment --comment "Antrea: do not track incoming encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --dst-type LOCAL -j NOTRACK -A ANTREA-OUTPUT -m comment --comment "Antrea: do not track outgoing encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --src-type LOCAL -j NOTRACK -A ANTREA-PREROUTING -m comment --comment "Antrea: drop Pod multicast traffic forwarded via underlay network" -m set --match-set CLUSTER-NODE-IP src -d 224.0.0.0/4 -j DROP +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to LoadBalancer request packets" -m set --match-set ANTREA-LOADBALANCER-IP dst,dst -j NOTRACK +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to externalIP request packets" -m set --match-set ANTREA-EXTERNAL-IP dst,dst -j NOTRACK +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to LoadBalancer reply packets" -m set --match-set ANTREA-LOADBALANCER-IP src,src -j NOTRACK +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to externalIP reply packets" -m set --match-set ANTREA-EXTERNAL-IP src,src -j NOTRACK COMMIT *mangle :ANTREA-MANGLE - [0:0] @@ -389,9 +447,17 @@ COMMIT COMMIT *nat :ANTREA-PREROUTING - [0:0] +-A ANTREA-PREROUTING -m comment --comment "Antrea: bypass external to kube Service traffic when kube Service Endpoint is not override" -d 10.96.0.1 -p tcp --dport 443 -j KUBE-SERVICES +-A ANTREA-PREROUTING -m comment --comment "Antrea: accept external to ClusterIP packets" -d 10.96.0.0/12 -j ACCEPT -A ANTREA-PREROUTING -m comment --comment "Antrea: DNAT external to NodePort packets" -m set --match-set ANTREA-NODEPORT-IP dst,dst -j DNAT --to-destination 169.254.0.252 +-A ANTREA-PREROUTING -m comment --comment "Antrea: accept external to LoadBalancer packets" -m set --match-set ANTREA-LOADBALANCER-IP dst,dst -j ACCEPT +-A ANTREA-PREROUTING -m comment --comment "Antrea: accept external to externalIP packets" -m set --match-set ANTREA-EXTERNAL-IP dst,dst -j ACCEPT :ANTREA-OUTPUT - [0:0] +-A ANTREA-OUTPUT -m comment --comment "Antrea: bypass local to kube Service traffic when kube Service Endpoint is not override" -d 10.96.0.1 -p tcp --dport 443 -j KUBE-SERVICES +-A ANTREA-OUTPUT -m comment --comment "Antrea: accept local to ClusterIP packets" -d 10.96.0.0/12 -j ACCEPT -A ANTREA-OUTPUT -m comment --comment "Antrea: DNAT local to NodePort packets" -m set --match-set ANTREA-NODEPORT-IP dst,dst -j DNAT --to-destination 169.254.0.252 +-A ANTREA-OUTPUT -m comment --comment "Antrea: accept local to LoadBalancer packets" -m set --match-set ANTREA-LOADBALANCER-IP dst,dst -j ACCEPT +-A ANTREA-OUTPUT -m comment --comment "Antrea: accept local to externalIP packets" -m set --match-set ANTREA-EXTERNAL-IP dst,dst -j ACCEPT :ANTREA-POSTROUTING - [0:0] -A ANTREA-POSTROUTING -m comment --comment "Antrea: SNAT Pod to external packets" ! -o antrea-gw0 -m mark --mark 0x00000001/0x000000ff -j SNAT --to 1.1.1.1 -A ANTREA-POSTROUTING -m comment --comment "Antrea: masquerade Pod to external packets" -s 172.16.10.0/24 -m set ! --match-set ANTREA-POD-IP dst ! -o antrea-gw0 -j MASQUERADE @@ -404,6 +470,10 @@ COMMIT :ANTREA-OUTPUT - [0:0] -A ANTREA-PREROUTING -m comment --comment "Antrea: do not track incoming encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --dst-type LOCAL -j NOTRACK -A ANTREA-OUTPUT -m comment --comment "Antrea: do not track outgoing encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --src-type LOCAL -j NOTRACK +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to LoadBalancer request packets" -m set --match-set ANTREA-LOADBALANCER-IP6 dst,dst -j NOTRACK +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to externalIP request packets" -m set --match-set ANTREA-EXTERNAL-IP6 dst,dst -j NOTRACK +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to LoadBalancer reply packets" -m set --match-set ANTREA-LOADBALANCER-IP6 src,src -j NOTRACK +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to externalIP reply packets" -m set --match-set ANTREA-EXTERNAL-IP6 src,src -j NOTRACK COMMIT *mangle :ANTREA-MANGLE - [0:0] @@ -432,9 +502,15 @@ COMMIT COMMIT *nat :ANTREA-PREROUTING - [0:0] +-A ANTREA-PREROUTING -m comment --comment "Antrea: accept external to ClusterIP packets" -d fec0::/64 -j ACCEPT -A ANTREA-PREROUTING -m comment --comment "Antrea: DNAT external to NodePort packets" -m set --match-set ANTREA-NODEPORT-IP6 dst,dst -j DNAT --to-destination fc01::aabb:ccdd:eefe +-A ANTREA-PREROUTING -m comment --comment "Antrea: accept external to LoadBalancer packets" -m set --match-set ANTREA-LOADBALANCER-IP6 dst,dst -j ACCEPT +-A ANTREA-PREROUTING -m comment --comment "Antrea: accept external to externalIP packets" -m set --match-set ANTREA-EXTERNAL-IP6 dst,dst -j ACCEPT :ANTREA-OUTPUT - [0:0] +-A ANTREA-OUTPUT -m comment --comment "Antrea: accept local to ClusterIP packets" -d fec0::/64 -j ACCEPT -A ANTREA-OUTPUT -m comment --comment "Antrea: DNAT local to NodePort packets" -m set --match-set ANTREA-NODEPORT-IP6 dst,dst -j DNAT --to-destination fc01::aabb:ccdd:eefe +-A ANTREA-OUTPUT -m comment --comment "Antrea: accept local to LoadBalancer packets" -m set --match-set ANTREA-LOADBALANCER-IP6 dst,dst -j ACCEPT +-A ANTREA-OUTPUT -m comment --comment "Antrea: accept local to externalIP packets" -m set --match-set ANTREA-EXTERNAL-IP6 dst,dst -j ACCEPT :ANTREA-POSTROUTING - [0:0] -A ANTREA-POSTROUTING -m comment --comment "Antrea: SNAT Pod to external packets" ! -o antrea-gw0 -m mark --mark 0x00000002/0x000000ff -j SNAT --to fe80::e643:4bff:fe02 -A ANTREA-POSTROUTING -m comment --comment "Antrea: masquerade Pod to external packets" -s 2001:ab03:cd04:55ef::/64 -m set ! --match-set ANTREA-POD-IP6 dst ! -o antrea-gw0 -j MASQUERADE @@ -540,18 +616,18 @@ COMMIT }, }, expectedCalls: func(mockIPTables *iptablestest.MockInterfaceMockRecorder) { - mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.RawTable, antreaPreRoutingChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.RawTable, iptables.PreRoutingChain, []string{"-j", antreaPreRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea prerouting rules"}) - mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.RawTable, antreaOutputChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.RawTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) - mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.FilterTable, antreaForwardChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.FilterTable, iptables.ForwardChain, []string{"-j", antreaForwardChain, "-m", "comment", "--comment", "Antrea: jump to Antrea forwarding rules"}) - mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.NATTable, antreaPostRoutingChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.NATTable, iptables.PostRoutingChain, []string{"-j", antreaPostRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea postrouting rules"}) - mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.MangleTable, antreaMangleChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.MangleTable, iptables.PreRoutingChain, []string{"-j", antreaMangleChain, "-m", "comment", "--comment", "Antrea: jump to Antrea mangle rules"}) - mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.MangleTable, antreaOutputChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.MangleTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) + mockIPTables.EnsureChain(iptables.ProtocolIPv4, iptables.RawTable, antreaPreRoutingChain) + mockIPTables.AppendRule(iptables.ProtocolIPv4, iptables.RawTable, iptables.PreRoutingChain, []string{"-j", antreaPreRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea prerouting rules"}) + mockIPTables.EnsureChain(iptables.ProtocolIPv4, iptables.RawTable, antreaOutputChain) + mockIPTables.AppendRule(iptables.ProtocolIPv4, iptables.RawTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) + mockIPTables.EnsureChain(iptables.ProtocolIPv4, iptables.FilterTable, antreaForwardChain) + mockIPTables.AppendRule(iptables.ProtocolIPv4, iptables.FilterTable, iptables.ForwardChain, []string{"-j", antreaForwardChain, "-m", "comment", "--comment", "Antrea: jump to Antrea forwarding rules"}) + mockIPTables.EnsureChain(iptables.ProtocolIPv4, iptables.NATTable, antreaPostRoutingChain) + mockIPTables.AppendRule(iptables.ProtocolIPv4, iptables.NATTable, iptables.PostRoutingChain, []string{"-j", antreaPostRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea postrouting rules"}) + mockIPTables.EnsureChain(iptables.ProtocolIPv4, iptables.MangleTable, antreaMangleChain) + mockIPTables.AppendRule(iptables.ProtocolIPv4, iptables.MangleTable, iptables.PreRoutingChain, []string{"-j", antreaMangleChain, "-m", "comment", "--comment", "Antrea: jump to Antrea mangle rules"}) + mockIPTables.EnsureChain(iptables.ProtocolIPv4, iptables.MangleTable, antreaOutputChain) + mockIPTables.AppendRule(iptables.ProtocolIPv4, iptables.MangleTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) mockIPTables.Restore(`*raw :ANTREA-PREROUTING - [0:0] :ANTREA-OUTPUT - [0:0] @@ -593,6 +669,14 @@ COMMIT nodeNetworkPolicyEnabled: tt.nodeNetworkPolicyEnabled, deterministic: true, } + if tt.proxyAll { + mockServiceCIDRProvider := servicecidrtest.NewMockInterface(ctrl) + c.serviceCIDRProvider = mockServiceCIDRProvider + serviceCIDRs, _ := utilnet.ParseCIDRs([]string{"10.96.0.0/12", "fec0::/64"}) + mockServiceCIDRProvider.EXPECT().GetServiceCIDRs().Return(serviceCIDRs, nil).Times(2) + c.kubeServiceHost = "10.96.0.1" + c.kubeServicePort = intstr.FromInt32(int32(443)) + } for mark, snatIP := range tt.markToSNATIP { c.markToSNATIP.Store(mark, net.ParseIP(snatIP)) } @@ -1315,7 +1399,7 @@ func TestDeleteSNATRule(t *testing.T) { } } -func TestAddNodePort(t *testing.T) { +func TestAddNodePortConf(t *testing.T) { tests := []struct { name string nodePortAddresses []net.IP @@ -1354,14 +1438,22 @@ func TestAddNodePort(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) ipset := ipsettest.NewMockInterface(ctrl) - c := &Client{ipset: ipset} + c := &Client{ipset: ipset, + serviceIPSets: map[string]*sync.Map{ + antreaNodePortIPSet: {}, + antreaNodePortIP6Set: {}, + antreaLoadBalancerIPSet: {}, + antreaLoadBalancerIP6Set: {}, + antreaExternalIPIPSet: {}, + antreaExternalIPIP6Set: {}, + }} tt.expectedCalls(ipset.EXPECT()) - assert.NoError(t, c.AddNodePort(tt.nodePortAddresses, tt.port, tt.protocol)) + assert.NoError(t, c.AddNodePortConf(tt.nodePortAddresses, tt.port, tt.protocol)) }) } } -func TestDeleteNodePort(t *testing.T) { +func TestDeleteNodePortConf(t *testing.T) { tests := []struct { name string nodePortAddresses []net.IP @@ -1400,9 +1492,194 @@ func TestDeleteNodePort(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) ipset := ipsettest.NewMockInterface(ctrl) - c := &Client{ipset: ipset} + c := &Client{ipset: ipset, serviceIPSets: serviceIPSets} + tt.expectedCalls(ipset.EXPECT()) + assert.NoError(t, c.DeleteNodePortConf(tt.nodePortAddresses, tt.port, tt.protocol)) + }) + } +} + +func TestAddLoadBalancerConf(t *testing.T) { + tests := []struct { + name string + loadBalancerIPs []net.IP + port uint16 + protocol binding.Protocol + expectedCalls func(ipset *ipsettest.MockInterfaceMockRecorder) + }{ + { + name: "ipv4 tcp", + loadBalancerIPs: []net.IP{ + net.ParseIP("1.1.1.1"), + net.ParseIP("1.1.2.2"), + }, + port: 80, + protocol: binding.ProtocolTCP, + expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { + ipset.AddEntry(antreaLoadBalancerIPSet, "1.1.1.1,tcp:80") + ipset.AddEntry(antreaLoadBalancerIPSet, "1.1.2.2,tcp:80") + }, + }, + { + name: "ipv6 udp", + loadBalancerIPs: []net.IP{ + net.ParseIP("fd00:1234:5678:dead:beaf::1"), + net.ParseIP("fd00:1234:5678:dead:beaf::2"), + }, + port: 8080, + protocol: binding.ProtocolUDPv6, + expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { + ipset.AddEntry(antreaLoadBalancerIP6Set, "fd00:1234:5678:dead:beaf::1,udp:8080") + ipset.AddEntry(antreaLoadBalancerIP6Set, "fd00:1234:5678:dead:beaf::2,udp:8080") + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + ipset := ipsettest.NewMockInterface(ctrl) + c := &Client{ipset: ipset, serviceIPSets: serviceIPSets} + tt.expectedCalls(ipset.EXPECT()) + assert.NoError(t, c.AddLoadBalancerConf(tt.loadBalancerIPs, tt.port, tt.protocol)) + }) + } +} + +func TestDeleteLoadBalancerConf(t *testing.T) { + tests := []struct { + name string + loadBalancerIPs []net.IP + port uint16 + protocol binding.Protocol + expectedCalls func(ipset *ipsettest.MockInterfaceMockRecorder) + }{ + { + name: "ipv4 tcp", + loadBalancerIPs: []net.IP{ + net.ParseIP("1.1.1.1"), + net.ParseIP("1.1.2.2"), + }, + port: 80, + protocol: binding.ProtocolTCP, + expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { + ipset.DelEntry(antreaLoadBalancerIPSet, "1.1.1.1,tcp:80") + ipset.DelEntry(antreaLoadBalancerIPSet, "1.1.2.2,tcp:80") + }, + }, + { + name: "ipv6 udp", + loadBalancerIPs: []net.IP{ + net.ParseIP("fd00:1234:5678:dead:beaf::1"), + net.ParseIP("fd00:1234:5678:dead:beaf::2"), + }, + port: 8080, + protocol: binding.ProtocolUDPv6, + expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { + ipset.DelEntry(antreaLoadBalancerIP6Set, "fd00:1234:5678:dead:beaf::1,udp:8080") + ipset.DelEntry(antreaLoadBalancerIP6Set, "fd00:1234:5678:dead:beaf::2,udp:8080") + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + ipset := ipsettest.NewMockInterface(ctrl) + c := &Client{ipset: ipset, serviceIPSets: serviceIPSets} + tt.expectedCalls(ipset.EXPECT()) + assert.NoError(t, c.DeleteLoadBalancerConf(tt.loadBalancerIPs, tt.port, tt.protocol)) + }) + } +} + +func TestAddExternalIPConf(t *testing.T) { + tests := []struct { + name string + externalIPs []net.IP + port uint16 + protocol binding.Protocol + expectedCalls func(ipset *ipsettest.MockInterfaceMockRecorder) + }{ + { + name: "ipv4 tcp", + externalIPs: []net.IP{ + net.ParseIP("1.1.1.1"), + net.ParseIP("1.1.2.2"), + }, + port: 80, + protocol: binding.ProtocolTCP, + expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { + ipset.AddEntry(antreaExternalIPIPSet, "1.1.1.1,tcp:80") + ipset.AddEntry(antreaExternalIPIPSet, "1.1.2.2,tcp:80") + }, + }, + { + name: "ipv6 udp", + externalIPs: []net.IP{ + net.ParseIP("fd00:1234:5678:dead:beaf::1"), + net.ParseIP("fd00:1234:5678:dead:beaf::2"), + }, + port: 8080, + protocol: binding.ProtocolUDPv6, + expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { + ipset.AddEntry(antreaExternalIPIP6Set, "fd00:1234:5678:dead:beaf::1,udp:8080") + ipset.AddEntry(antreaExternalIPIP6Set, "fd00:1234:5678:dead:beaf::2,udp:8080") + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + ipset := ipsettest.NewMockInterface(ctrl) + c := &Client{ipset: ipset, serviceIPSets: serviceIPSets} + tt.expectedCalls(ipset.EXPECT()) + assert.NoError(t, c.AddExternalIPConf(tt.externalIPs, tt.port, tt.protocol)) + }) + } +} + +func TestDeleteExternalIPConf(t *testing.T) { + tests := []struct { + name string + externalIPs []net.IP + port uint16 + protocol binding.Protocol + isDSR bool + expectedCalls func(ipset *ipsettest.MockInterfaceMockRecorder) + }{ + { + name: "ipv4 tcp", + externalIPs: []net.IP{ + net.ParseIP("1.1.1.1"), + net.ParseIP("1.1.2.2"), + }, + port: 80, + protocol: binding.ProtocolTCP, + expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { + ipset.DelEntry(antreaExternalIPIPSet, "1.1.1.1,tcp:80") + ipset.DelEntry(antreaExternalIPIPSet, "1.1.2.2,tcp:80") + }, + }, + { + name: "ipv6 udp", + externalIPs: []net.IP{ + net.ParseIP("fd00:1234:5678:dead:beaf::1"), + net.ParseIP("fd00:1234:5678:dead:beaf::2"), + }, + port: 8080, + protocol: binding.ProtocolUDPv6, + expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { + ipset.DelEntry(antreaExternalIPIP6Set, "fd00:1234:5678:dead:beaf::1,udp:8080") + ipset.DelEntry(antreaExternalIPIP6Set, "fd00:1234:5678:dead:beaf::2,udp:8080") + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + ipset := ipsettest.NewMockInterface(ctrl) + c := &Client{ipset: ipset, serviceIPSets: serviceIPSets} tt.expectedCalls(ipset.EXPECT()) - assert.NoError(t, c.DeleteNodePort(tt.nodePortAddresses, tt.port, tt.protocol)) + assert.NoError(t, c.DeleteExternalIPConf(tt.externalIPs, tt.port, tt.protocol)) }) } } diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 8fa585265a5..37d3c637820 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -76,6 +76,7 @@ type Client struct { func NewClient(networkConfig *config.NetworkConfig, noSNAT bool, proxyAll bool, + isKubeAPIServerOverridden bool, connectUplinkToBridge bool, nodeNetworkPolicyEnabled bool, multicastEnabled bool, @@ -476,7 +477,7 @@ func (c *Client) DeleteSNATRule(mark uint32) error { } // TODO: nodePortAddresses is not supported currently. -func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { +func (c *Client) AddNodePortConf(nodePortIPs []net.IP, port uint16, protocol binding.Protocol) error { netNatStaticMapping := &winnet.NetNatStaticMapping{ Name: antreaNatNodePort, ExternalIP: net.ParseIP("0.0.0.0"), @@ -493,7 +494,7 @@ func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol b return nil } -func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { +func (c *Client) DeleteNodePortConf(nodePortIPs []net.IP, port uint16, protocol binding.Protocol) error { key := fmt.Sprintf("%d-%s", port, protocol) obj, found := c.netNatStaticMappings.Load(key) if !found { @@ -509,6 +510,22 @@ func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protoco return nil } +func (c *Client) AddLoadBalancerConf(loadBalancerIPs []net.IP, port uint16, protocol binding.Protocol) error { + return nil +} + +func (c *Client) DeleteLoadBalancerConf(loadBalancerIPs []net.IP, port uint16, protocol binding.Protocol) error { + return nil +} + +func (c *Client) AddExternalIPConf(externalIPs []net.IP, port uint16, protocol binding.Protocol) error { + return nil +} + +func (c *Client) DeleteExternalIPConf(externalIPs []net.IP, port uint16, protocol binding.Protocol) error { + return nil +} + // AddExternalIPRoute adds a route entry that forwards traffic destined for the external IP to the Antrea gateway interface. func (c *Client) AddExternalIPRoute(externalIP net.IP) error { externalIPStr := externalIP.String() diff --git a/pkg/agent/route/route_windows_test.go b/pkg/agent/route/route_windows_test.go index 0f1648c1002..7173fce0130 100644 --- a/pkg/agent/route/route_windows_test.go +++ b/pkg/agent/route/route_windows_test.go @@ -260,17 +260,17 @@ func TestDeleteRoutes(t *testing.T) { assert.NoError(t, c.DeleteRoutes(podCIDR)) } -func TestAddNodePort(t *testing.T) { +func TestAddNodePortConf(t *testing.T) { ctrl := gomock.NewController(t) mockWinnet := winnettesting.NewMockInterface(ctrl) c := &Client{ winnet: mockWinnet, } mockWinnet.EXPECT().ReplaceNetNatStaticMapping(nodePortNetNatStaticMapping) - assert.NoError(t, c.AddNodePort(nil, nodePort, protocol)) + assert.NoError(t, c.AddNodePortConf(nil, nodePort, protocol)) } -func TestDeleteNodePort(t *testing.T) { +func TestDeleteNodePortConf(t *testing.T) { ctrl := gomock.NewController(t) mockWinnet := winnettesting.NewMockInterface(ctrl) c := &Client{ @@ -278,7 +278,7 @@ func TestDeleteNodePort(t *testing.T) { } c.netNatStaticMappings.Store(fmt.Sprintf("%d-%s", nodePort, protocol), nodePortNetNatStaticMapping) mockWinnet.EXPECT().RemoveNetNatStaticMapping(nodePortNetNatStaticMapping) - assert.NoError(t, c.DeleteNodePort(nil, nodePort, protocol)) + assert.NoError(t, c.DeleteNodePortConf(nil, nodePort, protocol)) } func TestAddServiceCIDRRoute(t *testing.T) { diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index 6153d2f3d0c..26219b8c4c8 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -1,4 +1,4 @@ -// Copyright 2023 Antrea Authors +// Copyright 2024 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -84,6 +84,20 @@ func (mr *MockInterfaceMockRecorder) AddEgressRule(arg0, arg1 any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddEgressRule", reflect.TypeOf((*MockInterface)(nil).AddEgressRule), arg0, arg1) } +// AddExternalIPConf mocks base method. +func (m *MockInterface) AddExternalIPConf(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddExternalIPConf", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddExternalIPConf indicates an expected call of AddExternalIPConf. +func (mr *MockInterfaceMockRecorder) AddExternalIPConf(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddExternalIPConf", reflect.TypeOf((*MockInterface)(nil).AddExternalIPConf), arg0, arg1, arg2) +} + // AddExternalIPRoute mocks base method. func (m *MockInterface) AddExternalIPRoute(arg0 net.IP) error { m.ctrl.T.Helper() @@ -98,6 +112,20 @@ func (mr *MockInterfaceMockRecorder) AddExternalIPRoute(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddExternalIPRoute", reflect.TypeOf((*MockInterface)(nil).AddExternalIPRoute), arg0) } +// AddLoadBalancerConf mocks base method. +func (m *MockInterface) AddLoadBalancerConf(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddLoadBalancerConf", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddLoadBalancerConf indicates an expected call of AddLoadBalancerConf. +func (mr *MockInterfaceMockRecorder) AddLoadBalancerConf(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLoadBalancerConf", reflect.TypeOf((*MockInterface)(nil).AddLoadBalancerConf), arg0, arg1, arg2) +} + // AddLocalAntreaFlexibleIPAMPodRule mocks base method. func (m *MockInterface) AddLocalAntreaFlexibleIPAMPodRule(arg0 []net.IP) error { m.ctrl.T.Helper() @@ -112,18 +140,18 @@ func (mr *MockInterfaceMockRecorder) AddLocalAntreaFlexibleIPAMPodRule(arg0 any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLocalAntreaFlexibleIPAMPodRule", reflect.TypeOf((*MockInterface)(nil).AddLocalAntreaFlexibleIPAMPodRule), arg0) } -// AddNodePort mocks base method. -func (m *MockInterface) AddNodePort(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { +// AddNodePortConf mocks base method. +func (m *MockInterface) AddNodePortConf(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddNodePort", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "AddNodePortConf", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } -// AddNodePort indicates an expected call of AddNodePort. -func (mr *MockInterfaceMockRecorder) AddNodePort(arg0, arg1, arg2 any) *gomock.Call { +// AddNodePortConf indicates an expected call of AddNodePortConf. +func (mr *MockInterfaceMockRecorder) AddNodePortConf(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNodePort", reflect.TypeOf((*MockInterface)(nil).AddNodePort), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNodePortConf", reflect.TypeOf((*MockInterface)(nil).AddNodePortConf), arg0, arg1, arg2) } // AddOrUpdateNodeNetworkPolicyIPSet mocks base method. @@ -238,6 +266,20 @@ func (mr *MockInterfaceMockRecorder) DeleteEgressRule(arg0, arg1 any) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteEgressRule", reflect.TypeOf((*MockInterface)(nil).DeleteEgressRule), arg0, arg1) } +// DeleteExternalIPConf mocks base method. +func (m *MockInterface) DeleteExternalIPConf(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteExternalIPConf", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteExternalIPConf indicates an expected call of DeleteExternalIPConf. +func (mr *MockInterfaceMockRecorder) DeleteExternalIPConf(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteExternalIPConf", reflect.TypeOf((*MockInterface)(nil).DeleteExternalIPConf), arg0, arg1, arg2) +} + // DeleteExternalIPRoute mocks base method. func (m *MockInterface) DeleteExternalIPRoute(arg0 net.IP) error { m.ctrl.T.Helper() @@ -252,6 +294,20 @@ func (mr *MockInterfaceMockRecorder) DeleteExternalIPRoute(arg0 any) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteExternalIPRoute", reflect.TypeOf((*MockInterface)(nil).DeleteExternalIPRoute), arg0) } +// DeleteLoadBalancerConf mocks base method. +func (m *MockInterface) DeleteLoadBalancerConf(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteLoadBalancerConf", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteLoadBalancerConf indicates an expected call of DeleteLoadBalancerConf. +func (mr *MockInterfaceMockRecorder) DeleteLoadBalancerConf(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteLoadBalancerConf", reflect.TypeOf((*MockInterface)(nil).DeleteLoadBalancerConf), arg0, arg1, arg2) +} + // DeleteLocalAntreaFlexibleIPAMPodRule mocks base method. func (m *MockInterface) DeleteLocalAntreaFlexibleIPAMPodRule(arg0 []net.IP) error { m.ctrl.T.Helper() @@ -294,18 +350,18 @@ func (mr *MockInterfaceMockRecorder) DeleteNodeNetworkPolicyIPTables(arg0, arg1 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNodeNetworkPolicyIPTables", reflect.TypeOf((*MockInterface)(nil).DeleteNodeNetworkPolicyIPTables), arg0, arg1) } -// DeleteNodePort mocks base method. -func (m *MockInterface) DeleteNodePort(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { +// DeleteNodePortConf mocks base method. +func (m *MockInterface) DeleteNodePortConf(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteNodePort", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "DeleteNodePortConf", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } -// DeleteNodePort indicates an expected call of DeleteNodePort. -func (mr *MockInterfaceMockRecorder) DeleteNodePort(arg0, arg1, arg2 any) *gomock.Call { +// DeleteNodePortConf indicates an expected call of DeleteNodePortConf. +func (mr *MockInterfaceMockRecorder) DeleteNodePortConf(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNodePort", reflect.TypeOf((*MockInterface)(nil).DeleteNodePort), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNodePortConf", reflect.TypeOf((*MockInterface)(nil).DeleteNodePortConf), arg0, arg1, arg2) } // DeleteRouteForLink mocks base method. diff --git a/pkg/agent/util/iptables/builder.go b/pkg/agent/util/iptables/builder.go index c0dcad4dd7e..deb8b75340e 100644 --- a/pkg/agent/util/iptables/builder.go +++ b/pkg/agent/util/iptables/builder.go @@ -23,6 +23,8 @@ import ( "strings" "k8s.io/apimachinery/pkg/util/intstr" + + "antrea.io/antrea/pkg/agent/util/ipset" ) type iptablesRule struct { @@ -50,7 +52,7 @@ func (b *iptablesRuleBuilder) writeSpec(spec string) { } func (b *iptablesRuleBuilder) MatchCIDRSrc(cidr string) IPTablesRuleBuilder { - if cidr == "" || cidr == "0.0.0.0/0" || cidr == "::/0" { + if cidr == "" || cidr == "0.0.0.0/0" || cidr == "::/0" || cidr == "0.0.0.0" || cidr == "::" { return b } matchStr := fmt.Sprintf("-s %s", cidr) @@ -59,7 +61,7 @@ func (b *iptablesRuleBuilder) MatchCIDRSrc(cidr string) IPTablesRuleBuilder { } func (b *iptablesRuleBuilder) MatchCIDRDst(cidr string) IPTablesRuleBuilder { - if cidr == "" || cidr == "0.0.0.0/0" || cidr == "::/0" { + if cidr == "" || cidr == "0.0.0.0/0" || cidr == "::/0" || cidr == "0.0.0.0" || cidr == "::" { return b } matchStr := fmt.Sprintf("-d %s", cidr) @@ -67,20 +69,38 @@ func (b *iptablesRuleBuilder) MatchCIDRDst(cidr string) IPTablesRuleBuilder { return b } -func (b *iptablesRuleBuilder) MatchIPSetSrc(ipset string) IPTablesRuleBuilder { - if ipset == "" { +func (b *iptablesRuleBuilder) MatchIPSetSrc(ipsetName string, ipsetType ipset.SetType) IPTablesRuleBuilder { + if ipsetName == "" { return b } - matchStr := fmt.Sprintf("-m set --match-set %s src", ipset) + var typeStr string + switch ipsetType { + case ipset.HashNet: + fallthrough + case ipset.HashIP: + typeStr = "src" + case ipset.HashIPPort: + typeStr = "src,src" + } + matchStr := fmt.Sprintf("-m set --match-set %s %s", ipsetName, typeStr) b.writeSpec(matchStr) return b } -func (b *iptablesRuleBuilder) MatchIPSetDst(ipset string) IPTablesRuleBuilder { - if ipset == "" { +func (b *iptablesRuleBuilder) MatchIPSetDst(ipsetName string, ipsetType ipset.SetType) IPTablesRuleBuilder { + if ipsetName == "" { return b } - matchStr := fmt.Sprintf("-m set --match-set %s dst", ipset) + var typeStr string + switch ipsetType { + case ipset.HashNet: + fallthrough + case ipset.HashIP: + typeStr = "dst" + case ipset.HashIPPort: + typeStr = "dst,dst" + } + matchStr := fmt.Sprintf("-m set --match-set %s %s", ipsetName, typeStr) b.writeSpec(matchStr) return b } @@ -94,7 +114,7 @@ func (b *iptablesRuleBuilder) MatchTransProtocol(protocol string) IPTablesRuleBu return b } -func (b *iptablesRuleBuilder) MatchDstPort(port *intstr.IntOrString, endPort *int32) IPTablesRuleBuilder { +func (b *iptablesRuleBuilder) MatchPortDst(port *intstr.IntOrString, endPort *int32) IPTablesRuleBuilder { if port == nil { return b } @@ -108,7 +128,7 @@ func (b *iptablesRuleBuilder) MatchDstPort(port *intstr.IntOrString, endPort *in return b } -func (b *iptablesRuleBuilder) MatchSrcPort(port, endPort *int32) IPTablesRuleBuilder { +func (b *iptablesRuleBuilder) MatchPortSrc(port, endPort *int32) IPTablesRuleBuilder { if port == nil { return b } @@ -178,6 +198,21 @@ func (b *iptablesRuleBuilder) SetTarget(target string) IPTablesRuleBuilder { return b } +func (b *iptablesRuleBuilder) SetTargetDNATToDst(dnatIP string, dnatPort *int32) IPTablesRuleBuilder { + if dnatIP == "" { + return b + } + var dstStr string + if dnatPort != nil { + dstStr = fmt.Sprintf("%s:%d", dnatIP, *dnatPort) + } else { + dstStr = dnatIP + } + specStr := fmt.Sprintf("--to-destination %s", dstStr) + b.writeSpec(specStr) + return b +} + func (b *iptablesRuleBuilder) SetComment(comment string) IPTablesRuleBuilder { if comment == "" { return b diff --git a/pkg/agent/util/iptables/builder_test.go b/pkg/agent/util/iptables/builder_test.go index c3da571a9a9..c03fc94d949 100644 --- a/pkg/agent/util/iptables/builder_test.go +++ b/pkg/agent/util/iptables/builder_test.go @@ -22,6 +22,8 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/util/intstr" + + "antrea.io/antrea/pkg/agent/util/ipset" ) var ( @@ -50,11 +52,11 @@ func TestBuilders(t *testing.T) { name: "Accept TCP destination 8080 in FORWARD", chain: ForwardChain, buildFunc: func(builder IPTablesRuleBuilder) IPTablesRule { - return builder.MatchIPSetSrc(ipsetAlfa). - MatchIPSetDst(ipsetBravo). + return builder.MatchIPSetSrc(ipsetAlfa, ipset.HashIP). + MatchIPSetDst(ipsetBravo, ipset.HashIP). MatchInputInterface(eth0). MatchTransProtocol(ProtocolTCP). - MatchDstPort(port8080, nil). + MatchPortDst(port8080, nil). MatchCIDRSrc(cidr). SetComment("Accept TCP 8080"). SetTarget(AcceptTarget). @@ -66,10 +68,10 @@ func TestBuilders(t *testing.T) { name: "Drop UDP destination 137-139 in INPUT", chain: "INPUT", buildFunc: func(builder IPTablesRuleBuilder) IPTablesRule { - return builder.MatchIPSetSrc(ipsetAlfa). + return builder.MatchIPSetSrc(ipsetAlfa, ipset.HashIP). MatchInputInterface(eth0). MatchTransProtocol(ProtocolUDP). - MatchDstPort(port137, &port139). + MatchPortDst(port137, &port139). MatchCIDRDst(cidr). SetComment("Drop UDP 137-139"). SetTarget(DropTarget). @@ -83,7 +85,7 @@ func TestBuilders(t *testing.T) { buildFunc: func(builder IPTablesRuleBuilder) IPTablesRule { return builder.MatchOutputInterface(eth1). MatchTransProtocol(ProtocolSCTP). - MatchSrcPort(&port40000, &port50000). + MatchPortSrc(&port40000, &port50000). SetComment("Drop SCTP 40000-50000"). SetTarget(DropTarget). Done() @@ -123,6 +125,20 @@ func TestBuilders(t *testing.T) { }, expected: `-A INPUT -p tcp -m conntrack --ctstate ESTABLISHED,RELATED -j ACCEPT`, }, + { + name: "DNAT packets from specific connections", + chain: PreRoutingChain, + buildFunc: func(builder IPTablesRuleBuilder) IPTablesRule { + return builder.MatchCIDRSrc("192.168.77.100"). + MatchCIDRDst("10.96.0.10"). + MatchTransProtocol(ProtocolTCP). + MatchPortDst(port8080, nil). + SetTarget(DNATTarget). + SetTargetDNATToDst("10.10.0.2", &port40000). + Done() + }, + expected: `-A PREROUTING -s 192.168.77.100 -d 10.96.0.10 -p tcp --dport 8080 -j DNAT --to-destination 10.10.0.2:40000`, + }, } for _, tc := range testCases { diff --git a/pkg/agent/util/iptables/iptables.go b/pkg/agent/util/iptables/iptables.go index d436f80afbd..3eee6ba1d00 100644 --- a/pkg/agent/util/iptables/iptables.go +++ b/pkg/agent/util/iptables/iptables.go @@ -28,6 +28,8 @@ import ( "github.com/coreos/go-iptables/iptables" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/util/ipset" ) const ( @@ -46,6 +48,7 @@ const ( SNATTarget = "SNAT" DNATTarget = "DNAT" RejectTarget = "REJECT" + NotrackTarget = "NOTRACK" PreRoutingChain = "PREROUTING" InputChain = "INPUT" @@ -99,7 +102,7 @@ type Interface interface { DeleteChain(protocol Protocol, table string, chain string) error - ListRules(table string, chain string) ([]string, error) + ListRules(protocol Protocol, table string, chain string) (map[Protocol][]string, error) Restore(data string, flush bool, useIPv6 bool) error @@ -109,16 +112,17 @@ type Interface interface { type IPTablesRuleBuilder interface { MatchCIDRSrc(cidr string) IPTablesRuleBuilder MatchCIDRDst(cidr string) IPTablesRuleBuilder - MatchIPSetSrc(ipset string) IPTablesRuleBuilder - MatchIPSetDst(ipset string) IPTablesRuleBuilder + MatchIPSetSrc(ipset string, ipsetType ipset.SetType) IPTablesRuleBuilder + MatchIPSetDst(ipset string, ipsetType ipset.SetType) IPTablesRuleBuilder MatchTransProtocol(protocol string) IPTablesRuleBuilder - MatchDstPort(port *intstr.IntOrString, endPort *int32) IPTablesRuleBuilder - MatchSrcPort(port, endPort *int32) IPTablesRuleBuilder + MatchPortDst(port *intstr.IntOrString, endPort *int32) IPTablesRuleBuilder + MatchPortSrc(port, endPort *int32) IPTablesRuleBuilder MatchICMP(icmpType, icmpCode *int32, ipProtocol Protocol) IPTablesRuleBuilder MatchEstablishedOrRelated() IPTablesRuleBuilder MatchInputInterface(interfaceName string) IPTablesRuleBuilder MatchOutputInterface(interfaceName string) IPTablesRuleBuilder SetTarget(target string) IPTablesRuleBuilder + SetTargetDNATToDst(dnatIP string, dnatPort *int32) IPTablesRuleBuilder SetComment(comment string) IPTablesRuleBuilder CopyBuilder() IPTablesRuleBuilder Done() IPTablesRule @@ -309,14 +313,18 @@ func (c *Client) DeleteChain(protocol Protocol, table string, chain string) erro } // ListRules lists all rules from a chain in a table. -func (c *Client) ListRules(table string, chain string) ([]string, error) { - var allRules []string +func (c *Client) ListRules(protocol Protocol, table string, chain string) (map[Protocol][]string, error) { + allRules := make(map[Protocol][]string) for p := range c.ipts { - rules, err := c.ipts[p].List(table, chain) + ipt := c.ipts[p] + if !matchProtocol(ipt, protocol) { + continue + } + rules, err := ipt.List(table, chain) if err != nil { - return rules, fmt.Errorf("error getting rules from table %s chain %s protocol %s: %v", table, chain, p, err) + return nil, fmt.Errorf("error getting rules from table %s chain %s protocol %s: %v", table, chain, p, err) } - allRules = append(allRules, rules...) + allRules[p] = rules } return allRules, nil } diff --git a/pkg/agent/util/iptables/testing/mock_iptables_linux.go b/pkg/agent/util/iptables/testing/mock_iptables_linux.go index 00c30f50390..290e03a4823 100644 --- a/pkg/agent/util/iptables/testing/mock_iptables_linux.go +++ b/pkg/agent/util/iptables/testing/mock_iptables_linux.go @@ -1,4 +1,4 @@ -// Copyright 2023 Antrea Authors +// Copyright 2024 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -139,18 +139,18 @@ func (mr *MockInterfaceMockRecorder) InsertRule(arg0, arg1, arg2, arg3 any) *gom } // ListRules mocks base method. -func (m *MockInterface) ListRules(arg0, arg1 string) ([]string, error) { +func (m *MockInterface) ListRules(arg0 iptables.Protocol, arg1, arg2 string) (map[iptables.Protocol][]string, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListRules", arg0, arg1) - ret0, _ := ret[0].([]string) + ret := m.ctrl.Call(m, "ListRules", arg0, arg1, arg2) + ret0, _ := ret[0].(map[iptables.Protocol][]string) ret1, _ := ret[1].(error) return ret0, ret1 } // ListRules indicates an expected call of ListRules. -func (mr *MockInterfaceMockRecorder) ListRules(arg0, arg1 any) *gomock.Call { +func (mr *MockInterfaceMockRecorder) ListRules(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListRules", reflect.TypeOf((*MockInterface)(nil).ListRules), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListRules", reflect.TypeOf((*MockInterface)(nil).ListRules), arg0, arg1, arg2) } // Restore mocks base method. diff --git a/pkg/util/env/env.go b/pkg/util/env/env.go index b23e92d1d4a..b187a4c3568 100644 --- a/pkg/util/env/env.go +++ b/pkg/util/env/env.go @@ -21,6 +21,7 @@ import ( "k8s.io/klog/v2" + "antrea.io/antrea/pkg/util/k8s" "antrea.io/antrea/pkg/util/runtime" ) @@ -134,3 +135,27 @@ func GetAntreaNamespace() string { func GetAllowNoEncapWithoutAntreaProxy() bool { return getBoolEnvVar(allowNoEncapWithoutAntreaProxyEnvKey, false) } + +func GetKubeServiceHost() string { + kubeServiceHost := os.Getenv(k8s.KubeServiceHostEnvKey) + if kubeServiceHost == "" { + klog.Warningf("Environment variable %s not found", k8s.KubeServiceHostEnvKey) + } + return kubeServiceHost +} + +func GetKubeServicePort() int32 { + kubeServicePortStr := os.Getenv(k8s.KubeServicePortEnvKey) + + if kubeServicePortStr == "" { + klog.Warningf("Environment variable %s not found", k8s.KubeServicePortEnvKey) + } else { + parsedValue, err := strconv.ParseInt(kubeServicePortStr, 10, 16) + if err != nil { + klog.Errorf("Failed to parse env variable '%s': %v", k8s.KubeServicePortEnvKey, err) + return 0 + } + return int32(parsedValue) + } + return 0 +} diff --git a/pkg/util/k8s/client.go b/pkg/util/k8s/client.go index 7ab555f170b..29e1f5b8faa 100644 --- a/pkg/util/k8s/client.go +++ b/pkg/util/k8s/client.go @@ -36,8 +36,8 @@ import ( ) const ( - kubeServiceHostEnvKey = "KUBERNETES_SERVICE_HOST" - kubeServicePortEnvKey = "KUBERNETES_SERVICE_PORT" + KubeServiceHostEnvKey = "KUBERNETES_SERVICE_HOST" + KubeServicePortEnvKey = "KUBERNETES_SERVICE_PORT" ) // CreateClients creates kube clients from the given config. @@ -125,8 +125,8 @@ func OverrideKubeAPIServer(kubeAPIServerOverride string) { host = hostPort port = "443" } - os.Setenv(kubeServiceHostEnvKey, host) - os.Setenv(kubeServicePortEnvKey, port) + os.Setenv(KubeServiceHostEnvKey, host) + os.Setenv(KubeServicePortEnvKey, port) } func EndpointSliceAPIAvailable(k8sClient clientset.Interface) (bool, error) { diff --git a/pkg/util/k8s/client_test.go b/pkg/util/k8s/client_test.go index 0b94a1a707a..cddfab5a960 100644 --- a/pkg/util/k8s/client_test.go +++ b/pkg/util/k8s/client_test.go @@ -108,12 +108,12 @@ func TestOverrideKubeAPIServer(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - t.Setenv(kubeServiceHostEnvKey, originalHost) - t.Setenv(kubeServicePortEnvKey, originalPort) + t.Setenv(KubeServiceHostEnvKey, originalHost) + t.Setenv(KubeServicePortEnvKey, originalPort) OverrideKubeAPIServer(tt.kubeAPIServerOverride) - assert.Equal(t, tt.expectHost, os.Getenv(kubeServiceHostEnvKey)) - assert.Equal(t, tt.expectPort, os.Getenv(kubeServicePortEnvKey)) + assert.Equal(t, tt.expectHost, os.Getenv(KubeServiceHostEnvKey)) + assert.Equal(t, tt.expectPort, os.Getenv(KubeServicePortEnvKey)) }) } } diff --git a/test/e2e/proxy_test.go b/test/e2e/proxy_test.go index 9f87aa1264d..3cec2b7054c 100644 --- a/test/e2e/proxy_test.go +++ b/test/e2e/proxy_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -141,6 +142,33 @@ func TestProxyLoadBalancerServiceIPv6(t *testing.T) { testProxyLoadBalancerService(t, true) } +func getExpectedHealthOutputTmpl(data *TestData) (string, error) { + _, err := data.clientset.AppsV1().DaemonSets(kubeNamespace).Get(context.TODO(), "kube-proxy", metav1.GetOptions{}) + if err == nil { + // When kube-proxy is present, healthServer in kube-proxy will be initialized. + return fmt.Sprintf(`{ + "service": { + "namespace": "%s", + "name": "agnhost-local" + }, + "localEndpoints": 1, + "serviceProxyHealthy": true +}`, data.testNamespace), nil + } else if apierrors.IsNotFound(err) { + // When kube-proxy is removed, healthServer in AntreaProxy will be initialized, and field `serviceProxyHealthy` + // is not supported yet in AntreaProxy healthServer. + return fmt.Sprintf(`{ + "service": { + "namespace": "%s", + "name": "agnhost-local" + }, + "localEndpoints": 1 +}`, data.testNamespace), nil + } else { + return "", err + } +} + func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) { skipIfHasWindowsNodes(t) skipIfNumNodesLessThan(t, 2) @@ -185,14 +213,9 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) { for _, nodeIP := range nodeIPs { healthUrls = append(healthUrls, net.JoinHostPort(nodeIP, healthPort)) } - healthOutputTmpl := `{ - "service": { - "namespace": "%s", - "name": "agnhost-local" - }, - "localEndpoints": 1 -}` - healthExpected := fmt.Sprintf(healthOutputTmpl, data.testNamespace) + + healthExpected, err := getExpectedHealthOutputTmpl(data) + require.NoError(t, err) port := "8080" clusterUrl := net.JoinHostPort(clusterIngressIP[0], port) diff --git a/test/integration/agent/route_test.go b/test/integration/agent/route_test.go index 63ed6adcab8..c0e774c111c 100644 --- a/test/integration/agent/route_test.go +++ b/test/integration/agent/route_test.go @@ -146,7 +146,7 @@ func TestInitialize(t *testing.T) { for _, tc := range tcs { t.Logf("Running Initialize test with mode %s node config %s", tc.networkConfig.TrafficEncapMode, nodeConfig) - routeClient, err := route.NewClient(tc.networkConfig, tc.noSNAT, false, false, false, false, nil) + routeClient, err := route.NewClient(tc.networkConfig, tc.noSNAT, false, false, false, false, false, nil) assert.NoError(t, err) var xtablesReleasedTime, initializedTime time.Time @@ -253,7 +253,7 @@ func TestIpTablesSync(t *testing.T) { gwLink := createDummyGW(t) defer netlink.LinkDel(gwLink) - routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap, IPv4Enabled: true}, false, false, false, false, false, nil) + routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap, IPv4Enabled: true}, false, false, false, false, false, false, nil) assert.Nil(t, err) inited := make(chan struct{}) @@ -304,7 +304,7 @@ func TestAddAndDeleteSNATRule(t *testing.T) { gwLink := createDummyGW(t) defer netlink.LinkDel(gwLink) - routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap, IPv4Enabled: true}, false, false, false, false, false, nil) + routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap, IPv4Enabled: true}, false, false, false, false, false, false, nil) assert.Nil(t, err) inited := make(chan struct{}) @@ -358,7 +358,7 @@ func TestAddAndDeleteRoutes(t *testing.T) { for _, tc := range tcs { t.Logf("Running test with mode %s peer cidr %s peer ip %s node config %s", tc.mode, tc.peerCIDR, tc.peerIP, nodeConfig) - routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: tc.mode, IPv4Enabled: true}, false, false, false, false, false, nil) + routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: tc.mode, IPv4Enabled: true}, false, false, false, false, false, false, nil) assert.NoError(t, err) err = routeClient.Initialize(nodeConfig, func() {}) assert.NoError(t, err) @@ -423,7 +423,7 @@ func TestSyncRoutes(t *testing.T) { for _, tc := range tcs { t.Logf("Running test with mode %s peer cidr %s peer ip %s node config %s", tc.mode, tc.peerCIDR, tc.peerIP, nodeConfig) - routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: tc.mode, IPv4Enabled: true}, false, false, false, false, false, nil) + routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: tc.mode, IPv4Enabled: true}, false, false, false, false, false, false, nil) assert.NoError(t, err) err = routeClient.Initialize(nodeConfig, func() {}) assert.NoError(t, err) @@ -466,7 +466,7 @@ func TestSyncGatewayKernelRoute(t *testing.T) { } require.NoError(t, netlink.AddrAdd(gwLink, &netlink.Addr{IPNet: gwNet}), "configuring gw IP failed") - routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false, false, false, false, false, nil) + routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false, false, false, false, false, false, nil) assert.NoError(t, err) err = routeClient.Initialize(nodeConfig, func() {}) assert.NoError(t, err) @@ -560,7 +560,7 @@ func TestReconcile(t *testing.T) { for _, tc := range tcs { t.Logf("Running test with mode %s added routes %v desired routes %v", tc.mode, tc.addedRoutes, tc.desiredPeerCIDRs) - routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: tc.mode, IPv4Enabled: true}, false, false, false, false, false, nil) + routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: tc.mode, IPv4Enabled: true}, false, false, false, false, false, false, nil) assert.NoError(t, err) err = routeClient.Initialize(nodeConfig, func() {}) assert.NoError(t, err) @@ -599,7 +599,7 @@ func TestRouteTablePolicyOnly(t *testing.T) { gwLink := createDummyGW(t) defer netlink.LinkDel(gwLink) - routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeNetworkPolicyOnly, IPv4Enabled: true}, false, false, false, false, false, nil) + routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeNetworkPolicyOnly, IPv4Enabled: true}, false, false, false, false, false, false, nil) assert.NoError(t, err) err = routeClient.Initialize(nodeConfig, func() {}) assert.NoError(t, err) @@ -655,7 +655,7 @@ func TestIPv6RoutesAndNeighbors(t *testing.T) { gwLink := createDummyGW(t) defer netlink.LinkDel(gwLink) - routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap, IPv4Enabled: true, IPv6Enabled: true}, false, false, false, false, false, nil) + routeClient, err := route.NewClient(&config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap, IPv4Enabled: true, IPv6Enabled: true}, false, false, false, false, false, false, nil) assert.Nil(t, err) _, ipv6Subnet, _ := net.ParseCIDR("fd74:ca9b:172:19::/64") gwIPv6 := net.ParseIP("fd74:ca9b:172:19::1")