diff --git a/.github/workflows/kind.yml b/.github/workflows/kind.yml index a8b1748313f..3ade70978e5 100644 --- a/.github/workflows/kind.yml +++ b/.github/workflows/kind.yml @@ -170,6 +170,7 @@ jobs: --coverage \ --encap-mode encap \ --proxy-all \ + --no-kube-proxy \ --feature-gates LoadBalancerModeDSR=true \ --load-balancer-mode dsr \ --node-ipam diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 17de0770d2a..03eb02d37fe 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -28,6 +28,7 @@ _usage="Usage: $0 [--encap-mode ] [--ip-family ] [--coverage] --feature-gates A comma-separated list of key=value pairs that describe feature gates, e.g. AntreaProxy=true,Egress=false. --run Run only tests matching the regexp. --proxy-all Enables Antrea proxy with all Service support. + --no-kube-proxy Don't deploy kube-proxy. --load-balancer-mode LoadBalancer mode. --node-ipam Enables Antrea NodeIPAM. --multicast Enables Multicast. @@ -72,6 +73,7 @@ mode="" ipfamily="v4" feature_gates="" proxy_all=false +no_kube_proxy=false load_balancer_mode="" node_ipam=false multicast=false @@ -106,6 +108,10 @@ case $key in proxy_all=true shift ;; + --no-kube-proxy) + no_kube_proxy=true + shift + ;; --load-balancer-mode) load_balancer_mode="$2" shift 2 @@ -299,7 +305,7 @@ function setup_cluster { echoerr "invalid value for --ip-family \"$ipfamily\", expected \"v4\" or \"v6\"" exit 1 fi - if $proxy_all; then + if $no_kube_proxy; then args="$args --no-kube-proxy" fi if $node_ipam; then @@ -353,7 +359,7 @@ function run_test { cat $CH_OPERATOR_YML | docker exec -i kind-control-plane dd of=/root/clickhouse-operator-install-bundle.yml fi - if $proxy_all; then + if $no_kube_proxy; then apiserver=$(docker exec -i kind-control-plane kubectl get endpoints kubernetes --no-headers | awk '{print $2}') if $coverage; then docker exec -i kind-control-plane sed -i.bak -E "s/^[[:space:]]*[#]?kubeAPIServerOverride[[:space:]]*:[[:space:]]*[a-z\"]+[[:space:]]*$/ kubeAPIServerOverride: \"$apiserver\"/" /root/antrea-coverage.yml /root/antrea-ipsec-coverage.yml diff --git a/docs/antrea-proxy.md b/docs/antrea-proxy.md index 1469a76fb49..313642963a4 100644 --- a/docs/antrea-proxy.md +++ b/docs/antrea-proxy.md @@ -42,13 +42,21 @@ the introduction of `proxyAll`, Antrea relied on userspace kube-proxy, which is no longer actively maintained by the K8s community and is slower than other kube-proxy backends. -Note that on Linux, even when `proxyAll` is enabled, kube-proxy will usually -take priority and will keep handling NodePort Service traffic (unless the source -is a Pod, which is pretty unusual as Pods typically access Services by -ClusterIP). This is because kube-proxy rules typically come before the rules -installed by AntreaProxy to redirect traffic to OVS. When kube-proxy is not -deployed or is removed from the cluster, AntreaProxy will then handle all -Service traffic. +Note that on Linux, before Antrea v2.1, when `proxyAll` is enabled, kube-proxy +will usually take priority over AntreaProxy and will keep handling all kinds of +Service traffic (unless the source is a Pod, which is pretty unusual as Pods +typically access Services by ClusterIP). This is because kube-proxy rules typically +come before the rules installed by AntreaProxy to redirect traffic to OVS. When +kube-proxy is not deployed or is removed from the cluster, AntreaProxy will then +handle all Service traffic. + +Starting with Antrea v2.1, when only `proxyAll` is enabled, AntreaProxy is capable +of handling all kinds of Service traffic except ClusterIP, even if kube-proxy in +iptables mode is present. This is accomplished by prioritizing the rules installed +by AntreaProxy redirecting Service traffic to OVS over those installed by kube-proxy. + +For kube-proxy in IPVS mode, you can either configure it to use iptables mode or +remove it. ### Removing kube-proxy diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index f1ba212229f..e38ce95f5cb 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -32,7 +32,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" apimachinerytypes "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" coreinformers "k8s.io/client-go/informers/core/v1" discoveryinformers "k8s.io/client-go/informers/discovery/v1" clientset "k8s.io/client-go/kubernetes" @@ -121,13 +120,6 @@ type proxier struct { serviceHealthServer healthcheck.ServiceHealthServer numLocalEndpoints map[apimachinerytypes.NamespacedName]int - // serviceIPRouteReferences tracks the references of Service IP routes. The key is the Service IP and the value is - // the set of ServiceInfo strings. Because a Service could have multiple ports and each port will generate a - // ServicePort (which is the unit of the processing), a Service IP route may be required by several ServicePorts. - // With the references, we install a route exactly once as long as it's used by any ServicePorts and uninstall it - // exactly once when it's no longer used by any ServicePorts. - // It applies to ClusterIP and LoadBalancerIP. - serviceIPRouteReferences map[string]sets.Set[string] // syncedOnce returns true if the proxier has synced rules at least once. syncedOnce bool syncedOnceMutex sync.RWMutex @@ -569,10 +561,10 @@ func (p *proxier) installNodePortService(localGroupID, clusterGroupID binding.Gr IsNested: false, // Unsupported for NodePort IsDSR: false, // Unsupported because external traffic has been DNAT'd in host network before it's forwarded to OVS. }); err != nil { - return fmt.Errorf("failed to install NodePort load balancing flows: %w", err) + return fmt.Errorf("failed to install NodePort load balancing OVS flows: %w", err) } - if err := p.routeClient.AddNodePort(p.nodePortAddresses, svcPort, protocol); err != nil { - return fmt.Errorf("failed to install NodePort traffic redirecting rules: %w", err) + if err := p.routeClient.AddNodePortConfigs(p.nodePortAddresses, svcPort, protocol); err != nil { + return fmt.Errorf("failed to install NodePort traffic redirecting routing configurations: %w", err) } return nil } @@ -588,8 +580,8 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot if err := p.ofClient.UninstallServiceFlows(svcIP, svcPort, protocol); err != nil { return fmt.Errorf("failed to remove NodePort load balancing flows: %w", err) } - if err := p.routeClient.DeleteNodePort(p.nodePortAddresses, svcPort, protocol); err != nil { - return fmt.Errorf("failed to remove NodePort traffic redirecting rules: %w", err) + if err := p.routeClient.DeleteNodePortConfigs(p.nodePortAddresses, svcPort, protocol); err != nil { + return fmt.Errorf("failed to remove NodePort traffic redirecting routing configurations: %w", err) } return nil } @@ -618,10 +610,10 @@ func (p *proxier) installExternalIPService(svcInfoStr string, IsNested: false, // Unsupported for ExternalIP IsDSR: features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR, }); err != nil { - return fmt.Errorf("failed to install ExternalIP load balancing flows: %w", err) + return fmt.Errorf("failed to install ExternalIP load balancing OVS flows: %w", err) } - if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddExternalIPRoute); err != nil { - return fmt.Errorf("failed to install ExternalIP traffic redirecting routes: %w", err) + if err := p.routeClient.AddExternalIPConfigs(svcInfoStr, ip); err != nil { + return fmt.Errorf("failed to install ExternalIP load balancing routing configurations: %w", err) } } return nil @@ -631,10 +623,10 @@ func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPString for _, externalIP := range externalIPStrings { ip := net.ParseIP(externalIP) if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil { - return fmt.Errorf("failed to remove ExternalIP load balancing flows: %w", err) + return fmt.Errorf("failed to remove ExternalIP load balancing OVS flows: %w", err) } - if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteExternalIPRoute); err != nil { - return fmt.Errorf("failed to remove ExternalIP traffic redirecting routes: %w", err) + if err := p.routeClient.DeleteExternalIPConfigs(svcInfoStr, ip); err != nil { + return fmt.Errorf("failed to remove ExternalIP traffic redirecting routing configurations: %w", err) } } return nil @@ -665,11 +657,11 @@ func (p *proxier) installLoadBalancerService(svcInfoStr string, IsNested: false, // Unsupported for LoadBalancerIP IsDSR: features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR, }); err != nil { - return fmt.Errorf("failed to install LoadBalancer load balancing flows: %w", err) + return fmt.Errorf("failed to install LoadBalancerIP load balancing OVS flows: %w", err) } if p.proxyAll { - if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddExternalIPRoute); err != nil { - return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err) + if err := p.routeClient.AddExternalIPConfigs(svcInfoStr, ip); err != nil { + return fmt.Errorf("failed to install LoadBalancerIP traffic redirecting routing configurations: %w", err) } } } @@ -677,33 +669,16 @@ func (p *proxier) installLoadBalancerService(svcInfoStr string, return nil } -func (p *proxier) addRouteForServiceIP(svcInfoStr string, ip net.IP, addRouteFn func(net.IP) error) error { - ipStr := ip.String() - references, exists := p.serviceIPRouteReferences[ipStr] - // If the IP was not referenced by any Service port, install a route for it. - // Otherwise, just reference it. - if !exists { - if err := addRouteFn(ip); err != nil { - return err - } - references = sets.New[string](svcInfoStr) - p.serviceIPRouteReferences[ipStr] = references - } else { - references.Insert(svcInfoStr) - } - return nil -} - func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error { for _, ingress := range loadBalancerIPStrings { if ingress != "" { ip := net.ParseIP(ingress) if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil { - return fmt.Errorf("failed to remove LoadBalancer load balancing flows: %w", err) + return fmt.Errorf("failed to remove LoadBalancerIP load balancing OVS flows: %w", err) } if p.proxyAll { - if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteExternalIPRoute); err != nil { - return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err) + if err := p.routeClient.DeleteExternalIPConfigs(svcInfoStr, ip); err != nil { + return fmt.Errorf("failed to remove LoadBalancerIP traffic redirecting routing configurations: %w", err) } } } @@ -711,25 +686,6 @@ func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIP return nil } -func (p *proxier) deleteRouteForServiceIP(svcInfoStr string, ip net.IP, deleteRouteFn func(net.IP) error) error { - ipStr := ip.String() - references, exists := p.serviceIPRouteReferences[ipStr] - // If the IP was not referenced by this Service port, skip it. - if exists && references.Has(svcInfoStr) { - // Delete the IP only if this Service port is the last one referencing it. - // Otherwise, just dereference it. - if references.Len() == 1 { - if err := deleteRouteFn(ip); err != nil { - return err - } - delete(p.serviceIPRouteReferences, ipStr) - } else { - references.Delete(svcInfoStr) - } - } - return nil -} - func (p *proxier) installServices() { for svcPortName, svcPort := range p.serviceMap { svcInfo := svcPort.(*types.ServiceInfo) @@ -1454,7 +1410,6 @@ func newProxier( endpointsInstalledMap: types.EndpointsMap{}, endpointsMap: types.EndpointsMap{}, endpointReferenceCounter: map[string]int{}, - serviceIPRouteReferences: map[string]sets.Set[string]{}, nodeLabels: map[string]string{}, serviceStringMap: map[string]k8sproxy.ServicePortName{}, groupCounter: groupCounter, diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 3901f7f0251..48235254447 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -534,6 +534,7 @@ func testClusterIPAdd(t *testing.T, } allSvcs := append(extraSvcs, makeTestClusterIPService(&svcPortName, svcIP, externalIPs, int32(svcPort), corev1.ProtocolTCP, nil, &internalTrafficPolicy, true, nil)) makeServiceMap(fp, allSvcs...) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, corev1.ProtocolTCP) if !endpointSliceEnabled { remoteEpSubset := makeTestEndpointSubset(&svcPortName, ep1IP, int32(svcPort), corev1.ProtocolTCP, false) @@ -612,7 +613,7 @@ func testClusterIPAdd(t *testing.T, } } if externalIP != nil { - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, externalIP) } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -671,6 +672,7 @@ func testLoadBalancerAdd(t *testing.T, &internalTrafficPolicy, externalTrafficPolicy) makeServiceMap(fp, svc) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, corev1.ProtocolTCP) if !endpointSliceEnabled { remoteEpSubset := makeTestEndpointSubset(&svcPortName, ep1IP, int32(svcPort), corev1.ProtocolTCP, false) @@ -805,11 +807,11 @@ func testLoadBalancerAdd(t *testing.T, } } if proxyLoadBalancerIPs { - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, loadBalancerIP) } - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) if externalIP != nil { - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, externalIP) } fp.syncProxyRules() @@ -856,6 +858,7 @@ func testNodePortAdd(t *testing.T, internalTrafficPolicy, externalTrafficPolicy) makeServiceMap(fp, svc) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, corev1.ProtocolTCP) if !endpointSliceEnabled { remoteEpSubset := makeTestEndpointSubset(&svcPortName, ep1IP, int32(svcPort), corev1.ProtocolTCP, false) @@ -957,9 +960,9 @@ func testNodePortAdd(t *testing.T, }) } } - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) if externalIP != nil { - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, externalIP) } fp.syncProxyRules() @@ -1139,6 +1142,8 @@ func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { }, } makeServiceMap(fp, svc) + svcPort80InfoStr := fmt.Sprintf("%s:%d/%s", svc1IPv4, port80Int32, corev1.ProtocolTCP) + svcPort443InfoStr := fmt.Sprintf("%s:%d/%s", svc1IPv4, port443Int32, corev1.ProtocolTCP) endpointSlice := &discovery.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ @@ -1227,8 +1232,8 @@ func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { ClusterGroupID: clusterGroupID1, IsExternal: true, }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP) - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIPv4) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcPort80InfoStr, loadBalancerIPv4) localGroupID2 := fp.groupCounter.AllocateIfNotExist(svcPortName2, true) clusterGroupID2 := fp.groupCounter.AllocateIfNotExist(svcPortName2, false) @@ -1261,7 +1266,8 @@ func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { ClusterGroupID: clusterGroupID2, IsExternal: true, }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcPort443InfoStr, loadBalancerIPv4) fp.syncProxyRules() @@ -1274,20 +1280,19 @@ func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv4, uint16(port80Int32), binding.ProtocolTCP) mockOFClient.EXPECT().UninstallServiceFlows(agentconfig.VirtualNodePortDNATIPv4, uint16(port30001Int32), binding.ProtocolTCP) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIPv4, uint16(port80Int32), binding.ProtocolTCP) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteNodePortConfigs(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcPort80InfoStr, loadBalancerIPv4) mockOFClient.EXPECT().UninstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort443, remoteEndpointForPort443})) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv4, uint16(port443Int32), binding.ProtocolTCP) mockOFClient.EXPECT().UninstallServiceFlows(agentconfig.VirtualNodePortDNATIPv4, uint16(port30002Int32), binding.ProtocolTCP) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIPv4, uint16(port443Int32), binding.ProtocolTCP) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteNodePortConfigs(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP) // The route for the ClusterIP and the LoadBalancer IP should only be uninstalled once. - mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIPv4) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcPort443InfoStr, loadBalancerIPv4) fp.syncProxyRules() - - assert.Emptyf(t, fp.serviceIPRouteReferences, "serviceIPRouteReferences was not cleaned up after Service was removed") } func TestNodePortAdd(t *testing.T) { @@ -1507,6 +1512,7 @@ func testClusterIPRemove(t *testing.T, protocol binding.Protocol, isIPv6 bool, n } svc := makeTestClusterIPService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), apiProtocol, nil, &internalTrafficPolicy, true, nil) makeServiceMap(fp, svc) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) var ep *corev1.Endpoints var eps *discovery.EndpointSlice @@ -1577,8 +1583,8 @@ func testClusterIPRemove(t *testing.T, protocol binding.Protocol, isIPv6 bool, n mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), protocol) } if externalIP != nil { - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) - mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, externalIP) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcInfoStr, externalIP) } if needClearConntrackEntries(protocol) { mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, protocol) @@ -1634,6 +1640,7 @@ func testNodePortRemove(t *testing.T, protocol binding.Protocol, isIPv6 bool, en corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeLocal) makeServiceMap(fp, svc) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) var ep *corev1.Endpoints var eps *discovery.EndpointSlice @@ -1667,7 +1674,7 @@ func testNodePortRemove(t *testing.T, protocol binding.Protocol, isIPv6 bool, en IsExternal: true, IsNodePort: true, }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) if externalIP != nil { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: externalIP, @@ -1678,17 +1685,17 @@ func testNodePortRemove(t *testing.T, protocol binding.Protocol, isIPv6 bool, en ClusterGroupID: 2, IsExternal: true, }) - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, externalIP) } mockOFClient.EXPECT().UninstallEndpointFlows(protocol, gomock.Any()) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), protocol) mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIP, uint16(svcNodePort), protocol) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().DeleteNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) if externalIP != nil { mockOFClient.EXPECT().UninstallServiceFlows(externalIP, uint16(svcPort), protocol) - mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcInfoStr, externalIP) } if needClearConntrackEntries(protocol) { mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, protocol) @@ -1751,6 +1758,7 @@ func testLoadBalancerRemove(t *testing.T, protocol binding.Protocol, isIPv6 bool &internalTrafficPolicy, externalTrafficPolicy) makeServiceMap(fp, svc) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) var ep *corev1.Endpoints var eps *discovery.EndpointSlice @@ -1793,8 +1801,8 @@ func testLoadBalancerRemove(t *testing.T, protocol binding.Protocol, isIPv6 bool ClusterGroupID: 2, IsExternal: true, }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, loadBalancerIP) if externalIP != nil { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: externalIP, @@ -1805,7 +1813,7 @@ func testLoadBalancerRemove(t *testing.T, protocol binding.Protocol, isIPv6 bool ClusterGroupID: 2, IsExternal: true, }) - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, externalIP) } mockOFClient.EXPECT().UninstallEndpointFlows(protocol, gomock.Any()) @@ -1813,11 +1821,11 @@ func testLoadBalancerRemove(t *testing.T, protocol binding.Protocol, isIPv6 bool mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIP, uint16(svcNodePort), protocol) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), protocol) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), protocol) - mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().DeleteNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcInfoStr, loadBalancerIP) if externalIP != nil { mockOFClient.EXPECT().UninstallServiceFlows(externalIP, uint16(svcPort), protocol) - mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcInfoStr, externalIP) } if needClearConntrackEntries(protocol) { mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, protocol) @@ -2098,11 +2106,11 @@ func testNodePortNoEndpoint(t *testing.T, protocol binding.Protocol, isIPv6 bool IsExternal: true, IsNodePort: true, }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), gomock.Any()) fp.syncProxyRules() mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIP, uint16(svcNodePort), gomock.Any()) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()) + mockRouteClient.EXPECT().DeleteNodePortConfigs(nodePortAddresses, uint16(svcNodePort), gomock.Any()) if needClearConntrackEntries(protocol) { for _, nodeIP := range nodePortAddresses { mockRouteClient.EXPECT().ClearConntrackEntryForService(nodeIP, uint16(svcNodePort), nil, protocol) @@ -2119,7 +2127,7 @@ func testNodePortNoEndpoint(t *testing.T, protocol binding.Protocol, isIPv6 bool IsExternal: true, IsNodePort: true, }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort)+1, gomock.Any()) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort)+1, gomock.Any()) fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) fp.syncProxyRules() } @@ -2180,6 +2188,8 @@ func testLoadBalancerNoEndpoint(t *testing.T, protocol binding.Protocol, isIPv6 externalTrafficPolicy) makeServiceMap(fp, svc) makeEndpointSliceMap(fp) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) + updatedSvcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort+1, apiProtocol) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, gomock.Any()) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(2), false, gomock.Any()) @@ -2209,8 +2219,8 @@ func testLoadBalancerNoEndpoint(t *testing.T, protocol binding.Protocol, isIPv6 ClusterGroupID: 2, IsExternal: true, }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()) - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), gomock.Any()) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, loadBalancerIP) fp.syncProxyRules() mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()) @@ -2220,8 +2230,8 @@ func testLoadBalancerNoEndpoint(t *testing.T, protocol binding.Protocol, isIPv6 mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, protocol) mockRouteClient.EXPECT().ClearConntrackEntryForService(loadBalancerIP, uint16(svcPort), nil, protocol) } - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()) - mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().DeleteNodePortConfigs(nodePortAddresses, uint16(svcNodePort), gomock.Any()) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcInfoStr, loadBalancerIP) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, ServicePort: uint16(svcPort + 1), @@ -2248,8 +2258,8 @@ func testLoadBalancerNoEndpoint(t *testing.T, protocol binding.Protocol, isIPv6 ClusterGroupID: 2, IsExternal: true, }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()) - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), gomock.Any()) + mockRouteClient.EXPECT().AddExternalIPConfigs(updatedSvcInfoStr, loadBalancerIP) fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) fp.syncProxyRules() } @@ -2367,6 +2377,7 @@ func testLoadBalancerRemoveEndpoints(t *testing.T, protocol binding.Protocol, is &internalTrafficPolicy, externalTrafficPolicy) makeServiceMap(fp, svc) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, false) eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*ep}, []discovery.EndpointPort{*epPort}, isIPv6) @@ -2402,9 +2413,9 @@ func testLoadBalancerRemoveEndpoints(t *testing.T, protocol binding.Protocol, is IsExternal: true, ClusterGroupID: 1, }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP) - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, loadBalancerIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, externalIP) fp.syncProxyRules() @@ -2587,6 +2598,8 @@ func testServicePortUpdate(t *testing.T, protocol binding.Protocol, isIPv6 bool, updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, nil, int32(svcPort+1), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) } makeServiceMap(fp, svc) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) + updatedSvcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort+1, apiProtocol) ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, false) eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*ep}, []discovery.EndpointPort{*epPort}, isIPv6) @@ -2624,7 +2637,7 @@ func testServicePortUpdate(t *testing.T, protocol binding.Protocol, isIPv6 bool, IsExternal: true, IsNodePort: true, }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIP, uint16(svcNodePort), protocol) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2635,8 +2648,8 @@ func testServicePortUpdate(t *testing.T, protocol binding.Protocol, isIPv6 bool, IsExternal: true, IsNodePort: true, }) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), protocol) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().DeleteNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2646,7 +2659,7 @@ func testServicePortUpdate(t *testing.T, protocol binding.Protocol, isIPv6 bool, ClusterGroupID: 1, IsExternal: true, }) - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, loadBalancerIP) s1 = mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), protocol) if needClearConntrackEntries(protocol) { @@ -2661,8 +2674,8 @@ func testServicePortUpdate(t *testing.T, protocol binding.Protocol, isIPv6 bool, }) s2.After(s1) - mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP) - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcInfoStr, loadBalancerIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(updatedSvcInfoStr, loadBalancerIP) } fp.syncProxyRules() @@ -2742,6 +2755,7 @@ func testServiceNodePortUpdate(t *testing.T, protocol binding.Protocol, isIPv6 b updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort+1), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) } makeServiceMap(fp, svc) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, false) eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*ep}, []discovery.EndpointPort{*epPort}, isIPv6) @@ -2767,10 +2781,10 @@ func testServiceNodePortUpdate(t *testing.T, protocol binding.Protocol, isIPv6 b IsExternal: true, IsNodePort: true, }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) s1 := mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIP, uint16(svcNodePort), protocol) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().DeleteNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) if needClearConntrackEntries(protocol) { for _, nodeIP := range nodePortAddresses { mockRouteClient.EXPECT().ClearConntrackEntryForService(nodeIP, uint16(svcNodePort), nil, protocol) @@ -2785,7 +2799,7 @@ func testServiceNodePortUpdate(t *testing.T, protocol binding.Protocol, isIPv6 b IsExternal: true, IsNodePort: true, }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort+1), protocol) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort+1), protocol) s2.After(s1) } if svcType == corev1.ServiceTypeLoadBalancer { @@ -2796,7 +2810,7 @@ func testServiceNodePortUpdate(t *testing.T, protocol binding.Protocol, isIPv6 b ClusterGroupID: 1, IsExternal: true, }) - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, loadBalancerIP) } fp.syncProxyRules() @@ -2869,6 +2883,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, protocol binding.Proto updatedSvc = svc.DeepCopy() updatedSvc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeLocal makeServiceMap(fp, svc) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) remoteEp, remoteEpPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, ep1IP, int32(svcPort), apiProtocol, false) localEp, localEpPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, ep2IP, int32(svcPort), apiProtocol, true) @@ -2897,7 +2912,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, protocol binding.Proto ClusterGroupID: 1, IsExternal: true, }) - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, externalIP) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2908,7 +2923,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, protocol binding.Proto IsExternal: true, IsNodePort: true, }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2918,7 +2933,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, protocol binding.Proto ClusterGroupID: 1, IsExternal: true, }) - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, loadBalancerIP) } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -2946,8 +2961,8 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, protocol binding.Proto TrafficPolicyLocal: true, IsExternal: true, }) - mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP) - mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcInfoStr, externalIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, externalIP) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { s1 := mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIP, uint16(svcNodePort), protocol) @@ -2963,8 +2978,8 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, protocol binding.Proto }) s2.After(s1) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), protocol) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().DeleteNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) } if svcType == corev1.ServiceTypeLoadBalancer { s1 := mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), protocol) @@ -2979,8 +2994,8 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, protocol binding.Proto }) s2.After(s1) - mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP) - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcInfoStr, loadBalancerIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, loadBalancerIP) } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -3153,24 +3168,25 @@ func testServiceExternalIPsUpdate(t *testing.T, protocol binding.Protocol, isIPv } fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll, withCleanupStaleUDPSvcConntrack) - var loadBalancerIPStrs, updatedLoadBalancerIPStrs []string + var loadBalancerIPStrings, updatedLoadBalancerIPStrings []string for _, ip := range loadBalancerIPs { - loadBalancerIPStrs = append(loadBalancerIPStrs, ip.String()) + loadBalancerIPStrings = append(loadBalancerIPStrings, ip.String()) } for _, ip := range updatedLoadBalancerIPs { - updatedLoadBalancerIPStrs = append(updatedLoadBalancerIPStrs, ip.String()) + updatedLoadBalancerIPStrings = append(updatedLoadBalancerIPStrings, ip.String()) } - var externalIPStrs, updatedExternalIPStrs []string + var externalIPStrings, updatedExternalIPStrings []string for _, ip := range externalIPs { - externalIPStrs = append(externalIPStrs, ip.String()) + externalIPStrings = append(externalIPStrings, ip.String()) } for _, ip := range updatedExternalIPs { - updatedExternalIPStrs = append(updatedExternalIPStrs, ip.String()) + updatedExternalIPStrings = append(updatedExternalIPStrings, ip.String()) } svc := makeTestLoadBalancerService(&svcPortName, svcIP, externalIPs, loadBalancerIPs, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) updatedSvc := makeTestLoadBalancerService(&svcPortName, svcIP, updatedExternalIPs, updatedLoadBalancerIPs, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) makeServiceMap(fp, svc) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, false) eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*ep}, []discovery.EndpointPort{*epPort}, isIPv6) @@ -3202,6 +3218,7 @@ func testServiceExternalIPsUpdate(t *testing.T, protocol binding.Protocol, isIPv ClusterGroupID: 1, IsExternal: true, }) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, ip) } for _, ip := range externalIPs { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3211,52 +3228,51 @@ func testServiceExternalIPsUpdate(t *testing.T, protocol binding.Protocol, isIPv ClusterGroupID: 1, IsExternal: true, }) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, ip) } - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) - for _, ip := range loadBalancerIPs { - mockRouteClient.EXPECT().AddExternalIPRoute(ip) - } - for _, ip := range externalIPs { - mockRouteClient.EXPECT().AddExternalIPRoute(ip) - } + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) - toDeleteLoadBalancerIPs := smallSliceDifference(loadBalancerIPStrs, updatedLoadBalancerIPStrs) - toAddLoadBalancerIPs := smallSliceDifference(updatedLoadBalancerIPStrs, loadBalancerIPStrs) - for _, ipStr := range toDeleteLoadBalancerIPs { - mockOFClient.EXPECT().UninstallServiceFlows(net.ParseIP(ipStr), uint16(svcPort), protocol) - mockRouteClient.EXPECT().DeleteExternalIPRoute(net.ParseIP(ipStr)) + toDeleteLoadBalancerIPStrings := smallSliceDifference(loadBalancerIPStrings, updatedLoadBalancerIPStrings) + toAddLoadBalancerIPStrings := smallSliceDifference(updatedLoadBalancerIPStrings, loadBalancerIPStrings) + for _, ipStr := range toDeleteLoadBalancerIPStrings { + ip := net.ParseIP(ipStr) + mockOFClient.EXPECT().UninstallServiceFlows(ip, uint16(svcPort), protocol) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcInfoStr, ip) if needClearConntrackEntries(protocol) { - mockRouteClient.EXPECT().ClearConntrackEntryForService(net.ParseIP(ipStr), uint16(svcPort), nil, protocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(ip, uint16(svcPort), nil, protocol) } } - for _, ipStr := range toAddLoadBalancerIPs { + for _, ipStr := range toAddLoadBalancerIPStrings { + ip := net.ParseIP(ipStr) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: net.ParseIP(ipStr), + ServiceIP: ip, ServicePort: uint16(svcPort), Protocol: protocol, ClusterGroupID: 1, IsExternal: true, }) - mockRouteClient.EXPECT().AddExternalIPRoute(net.ParseIP(ipStr)) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, ip) } - toDeleteExternalIPs := smallSliceDifference(externalIPStrs, updatedExternalIPStrs) - toAddLoadExternalIPs := smallSliceDifference(updatedExternalIPStrs, externalIPStrs) - for _, ipStr := range toDeleteExternalIPs { - mockOFClient.EXPECT().UninstallServiceFlows(net.ParseIP(ipStr), uint16(svcPort), protocol) - mockRouteClient.EXPECT().DeleteExternalIPRoute(net.ParseIP(ipStr)) + toDeleteExternalIPStrings := smallSliceDifference(externalIPStrings, updatedExternalIPStrings) + toAddLoadExternalIPStrings := smallSliceDifference(updatedExternalIPStrings, externalIPStrings) + for _, ipStr := range toDeleteExternalIPStrings { + ip := net.ParseIP(ipStr) + mockOFClient.EXPECT().UninstallServiceFlows(ip, uint16(svcPort), protocol) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcInfoStr, ip) if needClearConntrackEntries(protocol) { - mockRouteClient.EXPECT().ClearConntrackEntryForService(net.ParseIP(ipStr), uint16(svcPort), nil, protocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(ip, uint16(svcPort), nil, protocol) } } - for _, ipStr := range toAddLoadExternalIPs { + for _, ipStr := range toAddLoadExternalIPStrings { + ip := net.ParseIP(ipStr) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: net.ParseIP(ipStr), + ServiceIP: ip, ServicePort: uint16(svcPort), Protocol: protocol, ClusterGroupID: 1, IsExternal: true, }) - mockRouteClient.EXPECT().AddExternalIPRoute(net.ParseIP(ipStr)) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, ip) } fp.syncProxyRules() @@ -3317,6 +3333,7 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, protocol binding.Protoco updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, &updatedAffinitySeconds, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) } makeServiceMap(fp, svc) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, false) eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*ep}, []discovery.EndpointPort{*epPort}, isIPv6) @@ -3352,9 +3369,9 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, protocol binding.Protoco IsNodePort: true, AffinityTimeout: uint16(affinitySeconds), }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIP, uint16(svcNodePort), protocol) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().DeleteNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: virtualNodePortDNATIP, ServicePort: uint16(svcNodePort), @@ -3364,7 +3381,7 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, protocol binding.Protoco IsNodePort: true, AffinityTimeout: uint16(updatedAffinitySeconds), }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3375,9 +3392,9 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, protocol binding.Protoco AffinityTimeout: uint16(affinitySeconds), IsExternal: true, }) - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, loadBalancerIP) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), protocol) - mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcInfoStr, loadBalancerIP) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: loadBalancerIP, ServicePort: uint16(svcPort), @@ -3386,7 +3403,7 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, protocol binding.Protoco AffinityTimeout: uint16(updatedAffinitySeconds), IsExternal: true, }) - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, loadBalancerIP) } fp.syncProxyRules() @@ -3470,6 +3487,7 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, protocol binding.Protoco updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, &affinitySeconds, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) } makeServiceMap(fp, svc) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, false) eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*ep}, []discovery.EndpointPort{*epPort}, isIPv6) @@ -3505,7 +3523,7 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, protocol binding.Protoco IsExternal: true, IsNodePort: true, }) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) mockOFClient.EXPECT().UninstallServiceFlows(virtualNodePortDNATIP, uint16(svcNodePort), protocol) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3517,8 +3535,8 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, protocol binding.Protoco IsNodePort: true, AffinityTimeout: uint16(affinitySeconds), }) - mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), protocol) - mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().DeleteNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3528,7 +3546,7 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, protocol binding.Protoco ClusterGroupID: 1, IsExternal: true, }) - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, loadBalancerIP) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), protocol) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -3539,8 +3557,8 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, protocol binding.Protoco IsExternal: true, AffinityTimeout: uint16(affinitySeconds), }) - mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP) - mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcInfoStr, loadBalancerIP) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, loadBalancerIP) } fp.syncProxyRules() @@ -3779,7 +3797,7 @@ func TestGetServiceFlowKeys(t *testing.T) { makeEndpointSliceMap(fp, eps) } if tc.svc != nil && tc.eps != nil && tc.serviceInstalled { - mockRouteClient.EXPECT().AddNodePort(nodePortAddressesIPv4, uint16(svcNodePort), binding.ProtocolTCP) + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddressesIPv4, uint16(svcNodePort), binding.ProtocolTCP) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), gomock.Any(), gomock.Any()) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(2), gomock.Any(), gomock.Any()) mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any()) diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index 5355efbf3c6..dac45c2747c 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -76,17 +76,19 @@ type Interface interface { // DeleteEgressRule deletes the IP rule installed by AddEgressRule. DeleteEgressRule(tableID uint32, mark uint32) error - // AddNodePort adds configurations when a NodePort Service is created. - AddNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error + // AddNodePortConfigs adds routing configurations for redirecting traffic to OVS when a NodePort Service is created. + AddNodePortConfigs(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error - // DeleteNodePort deletes related configurations when a NodePort Service is deleted. - DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error + // DeleteNodePortConfigs deletes corresponding routing configurations when a NodePort Service is deleted. + DeleteNodePortConfigs(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error - // AddExternalIPRoute adds a route entry when an external IP is added. - AddExternalIPRoute(externalIP net.IP) error + // AddExternalIPConfigs adds routing configurations for redirecting traffic to OVS when an external Service IP + // (externalIP or LoadBalancerIP) is created. + AddExternalIPConfigs(svcInfoStr string, externalIP net.IP) error - // DeleteExternalIPRoute deletes the related route entry when an external IP is deleted. - DeleteExternalIPRoute(externalIP net.IP) error + // DeleteExternalIPConfigs deletes corresponding routing configurations for redirecting traffic to OVS when an + // external Service IP (externalIP or LoadBalancerIP) is deleted. + DeleteExternalIPConfigs(svcInfoStr string, externalIP net.IP) error // Run starts the sync loop. Run(stopCh <-chan struct{}) diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index ac61dcd2d9d..e223081868e 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -21,6 +21,7 @@ import ( "reflect" "sort" "strconv" + "strings" "sync" "time" @@ -66,9 +67,11 @@ const ( // clusterNodeIP6Set contains all other Node IP6s in the cluster. clusterNodeIP6Set = "CLUSTER-NODE-IP6" - // Antrea proxy NodePort IP - antreaNodePortIPSet = "ANTREA-NODEPORT-IP" - antreaNodePortIP6Set = "ANTREA-NODEPORT-IP6" + // Antrea managed ipsets for different types of Service IP addresses and ports. + antreaNodePortIPSet = "ANTREA-NODEPORT-IP" + antreaNodePortIP6Set = "ANTREA-NODEPORT-IP6" + antreaExternalIPIPSet = "ANTREA-EXTERNAL-IP" + antreaExternalIPIP6Set = "ANTREA-EXTERNAL-IP6" // Antrea managed iptables chains. antreaForwardChain = "ANTREA-FORWARD" @@ -78,6 +81,8 @@ const ( antreaOutputChain = "ANTREA-OUTPUT" antreaMangleChain = "ANTREA-MANGLE" + kubeProxyServiceChain = "KUBE-SERVICES" + serviceIPv4CIDRKey = "serviceIPv4CIDRKey" serviceIPv6CIDRKey = "serviceIPv6CIDRKey" @@ -121,12 +126,17 @@ type Client struct { nodeNetworkPolicyEnabled bool // serviceRoutes caches ip routes about Services. serviceRoutes sync.Map + // serviceExternalIPReferences tracks the references of Service IP. The key is the Service IP and the value is + // the set of ServiceInfo strings. Because a Service could have multiple ports and each port will generate a + // ServicePort (which is the unit of the processing), a Service IP route may be required by several ServicePorts. + // With the references, we install the configurations for a Service IP exactly once as long as it's used by any + // ServicePorts and uninstall it exactly once when it's no longer used by any ServicePorts. + // It applies to externalIP and LoadBalancerIP. + serviceExternalIPReferences map[string]sets.Set[string] // serviceNeighbors caches neighbors. serviceNeighbors sync.Map - // nodePortsIPv4 caches all existing IPv4 NodePorts. - nodePortsIPv4 sync.Map - // nodePortsIPv6 caches all existing IPv6 NodePorts. - nodePortsIPv6 sync.Map + // serviceIPSets caches ipsets about Services. + serviceIPSets map[string]*sync.Map // clusterNodeIPs stores the IPv4 of all other Nodes in the cluster clusterNodeIPs sync.Map // clusterNodeIP6s stores the IPv6 address of all other Nodes in the cluster. It is maintained but not consumed @@ -159,16 +169,23 @@ func NewClient(networkConfig *config.NetworkConfig, multicastEnabled bool, serviceCIDRProvider servicecidr.Interface) (*Client, error) { return &Client{ - networkConfig: networkConfig, - noSNAT: noSNAT, - proxyAll: proxyAll, - multicastEnabled: multicastEnabled, - connectUplinkToBridge: connectUplinkToBridge, - nodeNetworkPolicyEnabled: nodeNetworkPolicyEnabled, - ipset: ipset.NewClient(), - netlink: &netlink.Handle{}, - isCloudEKS: env.IsCloudEKS(), - serviceCIDRProvider: serviceCIDRProvider, + networkConfig: networkConfig, + noSNAT: noSNAT, + proxyAll: proxyAll, + multicastEnabled: multicastEnabled, + connectUplinkToBridge: connectUplinkToBridge, + nodeNetworkPolicyEnabled: nodeNetworkPolicyEnabled, + ipset: ipset.NewClient(), + netlink: &netlink.Handle{}, + isCloudEKS: env.IsCloudEKS(), + serviceCIDRProvider: serviceCIDRProvider, + serviceExternalIPReferences: make(map[string]sets.Set[string]), + serviceIPSets: map[string]*sync.Map{ + antreaNodePortIPSet: {}, + antreaNodePortIP6Set: {}, + antreaExternalIPIPSet: {}, + antreaExternalIPIP6Set: {}, + }, }, nil } @@ -388,27 +405,27 @@ func (c *Client) syncIPSet() error { // AntreaProxy proxyAll is available in all traffic modes. If proxyAll is enabled, create the ipsets to store the // pairs of Node IP and NodePort. if c.proxyAll { - if err := c.ipset.CreateIPSet(antreaNodePortIPSet, ipset.HashIPPort, false); err != nil { - return err - } - if err := c.ipset.CreateIPSet(antreaNodePortIP6Set, ipset.HashIPPort, true); err != nil { - return err - } + for ipsetName, ipsetEntries := range c.serviceIPSets { + isIPv6 := ipsetName == antreaNodePortIP6Set || ipsetName == antreaExternalIPIP6Set - c.nodePortsIPv4.Range(func(k, _ interface{}) bool { - ipSetEntry := k.(string) - if err := c.ipset.AddEntry(antreaNodePortIPSet, ipSetEntry); err != nil { - return false + var ipsetType ipset.SetType + if ipsetName == antreaNodePortIP6Set || ipsetName == antreaNodePortIPSet { + ipsetType = ipset.HashIPPort + } else { + ipsetType = ipset.HashIP } - return true - }) - c.nodePortsIPv6.Range(func(k, _ interface{}) bool { - ipSetEntry := k.(string) - if err := c.ipset.AddEntry(antreaNodePortIP6Set, ipSetEntry); err != nil { - return false + + if err := c.ipset.CreateIPSet(ipsetName, ipsetType, isIPv6); err != nil { + return err } - return true - }) + ipsetEntries.Range(func(k, _ interface{}) bool { + ipsetEntry := k.(string) + if err := c.ipset.AddEntry(ipsetName, ipsetEntry); err != nil { + return false + } + return true + }) + } } // AntreaIPAM is available in noEncap mode. There is a validation in Antrea configuration about this traffic mode @@ -494,6 +511,14 @@ func getNodePortIPSetName(isIPv6 bool) string { } } +func getExternalIPIPSetName(isIPv6 bool) string { + if isIPv6 { + return antreaExternalIPIP6Set + } else { + return antreaExternalIPIPSet + } +} + func getLocalAntreaFlexibleIPAMPodIPSetName(isIPv6 bool) string { if isIPv6 { return localAntreaFlexibleIPAMPodIP6Set @@ -562,43 +587,101 @@ func (c *Client) writeEKSNATRules(iptablesData *bytes.Buffer) { }...) } +func (c *Client) getIPProtocol() iptables.Protocol { + switch { + case c.networkConfig.IPv4Enabled && c.networkConfig.IPv6Enabled: + return iptables.ProtocolDual + case c.networkConfig.IPv6Enabled: + return iptables.ProtocolIPv6 + default: + return iptables.ProtocolIPv4 + } +} + +// Create the antrea managed chains and link them to built-in chains. +// We cannot use iptables-restore for these jump rules because there +// are non antrea managed rules in built-in chains. +type jumpRule struct { + table string + srcChain string + dstChain string + comment string + insert bool +} + +func (c *Client) removeUnexpectedAntreaJumpRule(protocol iptables.Protocol, jumpRule jumpRule) error { + // List all the existing rules of the table and the chain where the Antrea jump rule will be added. + allExistingRules, err := c.iptables.ListRules(protocol, jumpRule.table, jumpRule.srcChain) + if err != nil { + return err + } + + // Construct keywords to identify Antrea and kube-proxy jump rules. + antreaJumpRuleKeyword := fmt.Sprintf("-j %s", jumpRule.dstChain) + kubeProxyJumpRuleKeyword := fmt.Sprintf("-j %s", kubeProxyServiceChain) + + for ipProtocol, rules := range allExistingRules { + var antreaJumpRuleIndex, kubeProxyJumpRuleIndex = -1, -1 + + for index, rule := range rules { + // Check if the current rule is the Antrea jump rule to be added. + if strings.Contains(rule, antreaJumpRuleKeyword) { + antreaJumpRuleIndex = index + // Check if the current rule is the kube-proxy jump rule. + } else if strings.Contains(rule, kubeProxyJumpRuleKeyword) { + kubeProxyJumpRuleIndex = index + } + } + // If the Antrea jump rule is installed after the kube-proxy jump rule, which is not expected, delete the + // existing Antrea jump rule to ensure that a new one will be installed before the kube-proxy one when syncing iptables. + if antreaJumpRuleIndex != -1 && kubeProxyJumpRuleIndex != -1 && antreaJumpRuleIndex > kubeProxyJumpRuleIndex { + ruleSpec := []string{"-j", jumpRule.dstChain, "-m", "comment", "--comment", jumpRule.comment} + if err := c.iptables.DeleteRule(ipProtocol, jumpRule.table, jumpRule.srcChain, ruleSpec); err != nil { + return err + } + } + } + return nil +} + // syncIPTables ensure that the iptables infrastructure we use is set up. // It's idempotent and can safely be called on every startup. func (c *Client) syncIPTables() error { - // Create the antrea managed chains and link them to built-in chains. - // We cannot use iptables-restore for these jump rules because there - // are non antrea managed rules in built-in chains. - type jumpRule struct { - table string - srcChain string - dstChain string - comment string - } + ipProtocol := c.getIPProtocol() jumpRules := []jumpRule{ - {iptables.RawTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules"}, - {iptables.RawTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}, - {iptables.FilterTable, iptables.ForwardChain, antreaForwardChain, "Antrea: jump to Antrea forwarding rules"}, - {iptables.NATTable, iptables.PostRoutingChain, antreaPostRoutingChain, "Antrea: jump to Antrea postrouting rules"}, - {iptables.MangleTable, iptables.PreRoutingChain, antreaMangleChain, "Antrea: jump to Antrea mangle rules"}, // TODO: unify the chain naming style - {iptables.MangleTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}, + {iptables.RawTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules", false}, + {iptables.RawTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", false}, + {iptables.FilterTable, iptables.ForwardChain, antreaForwardChain, "Antrea: jump to Antrea forwarding rules", false}, + {iptables.NATTable, iptables.PostRoutingChain, antreaPostRoutingChain, "Antrea: jump to Antrea postrouting rules", false}, + {iptables.MangleTable, iptables.PreRoutingChain, antreaMangleChain, "Antrea: jump to Antrea mangle rules", false}, // TODO: unify the chain naming style + {iptables.MangleTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", false}, } if c.proxyAll || c.isCloudEKS { - jumpRules = append(jumpRules, jumpRule{iptables.NATTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules"}) + jumpRules = append(jumpRules, jumpRule{iptables.NATTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules", c.proxyAll == true}) } if c.proxyAll { - jumpRules = append(jumpRules, jumpRule{iptables.NATTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}) + jumpRules = append(jumpRules, jumpRule{iptables.NATTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", true}) } if c.nodeNetworkPolicyEnabled { - jumpRules = append(jumpRules, jumpRule{iptables.FilterTable, iptables.InputChain, antreaInputChain, "Antrea: jump to Antrea input rules"}) - jumpRules = append(jumpRules, jumpRule{iptables.FilterTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}) + jumpRules = append(jumpRules, jumpRule{iptables.FilterTable, iptables.InputChain, antreaInputChain, "Antrea: jump to Antrea input rules", false}) + jumpRules = append(jumpRules, jumpRule{iptables.FilterTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", false}) } for _, rule := range jumpRules { - if err := c.iptables.EnsureChain(iptables.ProtocolDual, rule.table, rule.dstChain); err != nil { + if err := c.iptables.EnsureChain(ipProtocol, rule.table, rule.dstChain); err != nil { return err } ruleSpec := []string{"-j", rule.dstChain, "-m", "comment", "--comment", rule.comment} - if err := c.iptables.AppendRule(iptables.ProtocolDual, rule.table, rule.srcChain, ruleSpec); err != nil { - return err + if rule.insert { + if err := c.removeUnexpectedAntreaJumpRule(ipProtocol, rule); err != nil { + return err + } + if err := c.iptables.InsertRule(ipProtocol, rule.table, rule.srcChain, ruleSpec); err != nil { + return err + } + } else { + if err := c.iptables.AppendRule(ipProtocol, rule.table, rule.srcChain, ruleSpec); err != nil { + return err + } } } @@ -636,6 +719,7 @@ func (c *Client) syncIPTables() error { antreaPodIPSet, localAntreaFlexibleIPAMPodIPSet, antreaNodePortIPSet, + antreaExternalIPIPSet, clusterNodeIPSet, config.VirtualNodePortDNATIPv4, config.VirtualServiceIPv4, @@ -655,6 +739,7 @@ func (c *Client) syncIPTables() error { antreaPodIP6Set, localAntreaFlexibleIPAMPodIP6Set, antreaNodePortIP6Set, + antreaExternalIPIP6Set, clusterNodeIP6Set, config.VirtualNodePortDNATIPv6, config.VirtualServiceIPv6, @@ -674,6 +759,7 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet, localAntreaFlexibleIPAMPodIPSet, nodePortIPSet, + externalIPSet, clusterNodeIPSet string, nodePortDNATVirtualIP, serviceVirtualIP net.IP, @@ -731,6 +817,32 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, }...) } } + if c.proxyAll { + writeLine(iptablesData, []string{ + "-A", antreaPreRoutingChain, + "-m", "comment", "--comment", `"Antrea: do not track external to external IP request packets"`, + "-m", "set", "--match-set", externalIPSet, "dst", + "-j", iptables.NotrackTarget, + }...) + writeLine(iptablesData, []string{ + "-A", antreaPreRoutingChain, + "-m", "comment", "--comment", `"Antrea: do not track external to external IP reply packets"`, + "-m", "set", "--match-set", externalIPSet, "src", + "-j", iptables.NotrackTarget, + }...) + writeLine(iptablesData, []string{ + "-A", antreaOutputChain, + "-m", "comment", "--comment", `"Antrea: do not track local to external IP request packets"`, + "-m", "set", "--match-set", externalIPSet, "dst", + "-j", iptables.NotrackTarget, + }...) + writeLine(iptablesData, []string{ + "-A", antreaOutputChain, + "-m", "comment", "--comment", `"Antrea: do not track local to external IP reply packets"`, + "-m", "set", "--match-set", externalIPSet, "src", + "-j", iptables.NotrackTarget, + }...) + } writeLine(iptablesData, "COMMIT") // Write head lines anyway so the undesired rules can be deleted when noEncap -> encap. @@ -1614,9 +1726,9 @@ func (c *Client) addVirtualServiceIPRoute(isIPv6 bool) error { return nil } -// AddNodePort is used to add IP,port:protocol entries to target ip set when a NodePort Service is added. An entry is added -// for every NodePort IP. -func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { +// AddNodePortConfigs is used to add IP,protocol:port entries to target ipset when a NodePort Service is added. An +// entry is added for every NodePort IP. +func (c *Client) AddNodePortConfigs(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { isIPv6 := isIPv6Protocol(protocol) transProtocol := getTransProtocolStr(protocol) ipSetName := getNodePortIPSetName(isIPv6) @@ -1626,19 +1738,15 @@ func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol b if err := c.ipset.AddEntry(ipSetName, ipSetEntry); err != nil { return err } - if isIPv6 { - c.nodePortsIPv6.Store(ipSetEntry, struct{}{}) - } else { - c.nodePortsIPv4.Store(ipSetEntry, struct{}{}) - } + c.serviceIPSets[ipSetName].Store(ipSetEntry, struct{}{}) klog.V(4).InfoS("Added ipset for NodePort", "IP", nodePortAddresses[i], "Port", port, "Protocol", protocol) } return nil } -// DeleteNodePort is used to delete related IP set entries when a NodePort Service is deleted. -func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { +// DeleteNodePortConfigs is used to delete corresponding ipset entries when a NodePort Service is deleted. +func (c *Client) DeleteNodePortConfigs(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { isIPv6 := isIPv6Protocol(protocol) transProtocol := getTransProtocolStr(protocol) ipSetName := getNodePortIPSetName(isIPv6) @@ -1648,11 +1756,8 @@ func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protoco if err := c.ipset.DelEntry(ipSetName, ipSetEntry); err != nil { return err } - if isIPv6 { - c.nodePortsIPv6.Delete(ipSetEntry) - } else { - c.nodePortsIPv4.Delete(ipSetEntry) - } + c.serviceIPSets[ipSetName].Delete(ipSetEntry) + klog.V(4).InfoS("Deleted ipset entry for NodePort IP", "IP", nodePortAddresses[i], "Port", port, "Protocol", protocol) } return nil @@ -1752,47 +1857,82 @@ func (c *Client) addVirtualNodePortDNATIPRoute(isIPv6 bool) error { return nil } -// AddExternalIPRoute adds a route entry that forwards traffic destined for the external IP to the Antrea gateway interface. -func (c *Client) AddExternalIPRoute(externalIP net.IP) error { +// AddExternalIPConfigs adds a route entry to forward traffic destined for the external Service IP to the Antrea +// gateway interface. Additionally, it adds the IP to the ipset ANTREA-EXTERNAL-IP or ANTREA-EXTERNAL-IP6, which is +// used by iptables rules to bypass kube-proxy. +func (c *Client) AddExternalIPConfigs(svcInfoStr string, externalIP net.IP) error { externalIPStr := externalIP.String() - linkIndex := c.nodeConfig.GatewayConfig.LinkIndex isIPv6 := utilnet.IsIPv6(externalIP) - var gw net.IP - var mask int - if !isIPv6 { - gw = config.VirtualServiceIPv4 - mask = net.IPv4len * 8 - } else { - gw = config.VirtualServiceIPv6 - mask = net.IPv6len * 8 - } + references, exists := c.serviceExternalIPReferences[externalIPStr] + if !exists { + linkIndex := c.nodeConfig.GatewayConfig.LinkIndex + var gw net.IP + var mask int + if !isIPv6 { + gw = config.VirtualServiceIPv4 + mask = net.IPv4len * 8 + } else { + gw = config.VirtualServiceIPv6 + mask = net.IPv6len * 8 + } + route := generateRoute(externalIP, mask, gw, linkIndex, netlink.SCOPE_UNIVERSE) + if err := c.netlink.RouteReplace(route); err != nil { + return fmt.Errorf("failed to add route for external IP %s: %w", externalIPStr, err) + } + klog.V(4).InfoS("Added route for external IP", "IP", externalIPStr) - route := generateRoute(externalIP, mask, gw, linkIndex, netlink.SCOPE_UNIVERSE) - if err := c.netlink.RouteReplace(route); err != nil { - return fmt.Errorf("failed to install route for external IP %s: %w", externalIPStr, err) + ipsetName := getExternalIPIPSetName(isIPv6) + if err := c.ipset.AddEntry(ipsetName, externalIPStr); err != nil { + return fmt.Errorf("failed to add %s to ipset %s", externalIPStr, ipsetName) + } + klog.V(4).InfoS("Added external IP to ipset", "IPSet", ipsetName, "IP", externalIPStr) + + references = sets.New[string](svcInfoStr) + c.serviceExternalIPReferences[externalIPStr] = references + c.serviceRoutes.Store(externalIPStr, route) + c.serviceIPSets[ipsetName].Store(externalIPStr, struct{}{}) + } else { + references.Insert(svcInfoStr) } - c.serviceRoutes.Store(externalIPStr, route) - klog.V(4).InfoS("Added route for external IP", "IP", externalIPStr) return nil } -// DeleteExternalIPRoute deletes the route entry for the external IP. -func (c *Client) DeleteExternalIPRoute(externalIP net.IP) error { +// DeleteExternalIPConfigs deletes the route entry to forward traffic destined for the external Service IP to the Antrea +// gateway interface. Additionally, it removes the IP to the ipset ANTREA-EXTERNAL-IP or ANTREA-EXTERNAL-IP6, which is +// used by iptables rules to bypass kube-proxy. +func (c *Client) DeleteExternalIPConfigs(svcInfoStr string, externalIP net.IP) error { externalIPStr := externalIP.String() - route, found := c.serviceRoutes.Load(externalIPStr) - if !found { - klog.V(2).InfoS("Didn't find route for external IP", "IP", externalIPStr) - return nil - } - if err := c.netlink.RouteDel(route.(*netlink.Route)); err != nil { - if err.Error() == "no such process" { - klog.InfoS("Failed to delete route for external IP since it doesn't exist", "IP", externalIPStr) + isIPv6 := utilnet.IsIPv6(externalIP) + references, exists := c.serviceExternalIPReferences[externalIPStr] + if exists && references.Has(svcInfoStr) { + if references.Len() == 1 { + route, found := c.serviceRoutes.Load(externalIPStr) + if !found { + klog.V(2).InfoS("Didn't find route for external IP", "IP", externalIPStr) + return nil + } + if err := c.netlink.RouteDel(route.(*netlink.Route)); err != nil { + if err.Error() == "no such process" { + klog.InfoS("Failed to delete route for external IP since it doesn't exist", "IP", externalIPStr) + } else { + return fmt.Errorf("failed to delete route for external IP %s: %w", externalIPStr, err) + } + } + klog.V(4).InfoS("Deleted route for external IP", "IP", externalIPStr) + + ipsetName := getExternalIPIPSetName(isIPv6) + if err := c.ipset.DelEntry(ipsetName, externalIPStr); err != nil { + return err + } + klog.V(4).InfoS("Deleted external IP from ipset", "IPSet", ipsetName, "IP", externalIPStr) + + delete(c.serviceExternalIPReferences, externalIPStr) + c.serviceRoutes.Delete(externalIPStr) + c.serviceIPSets[ipsetName].Delete(externalIPStr) } else { - return fmt.Errorf("failed to delete route for external IP %s: %w", externalIPStr, err) + references.Delete(svcInfoStr) } } - c.serviceRoutes.Delete(externalIPStr) - klog.V(4).InfoS("Deleted route for external IP", "IP", externalIPStr) return nil } diff --git a/pkg/agent/route/route_linux_test.go b/pkg/agent/route/route_linux_test.go index 5104f81c587..ecd386773c1 100644 --- a/pkg/agent/route/route_linux_test.go +++ b/pkg/agent/route/route_linux_test.go @@ -25,6 +25,7 @@ import ( "go.uber.org/mock/gomock" "golang.org/x/sys/unix" "k8s.io/apimachinery/pkg/util/sets" + utilnet "k8s.io/utils/net" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow" @@ -52,6 +53,13 @@ var ( ipv4Route2 = generateRoute(net.ParseIP(externalIPv4Addr2), 32, config.VirtualServiceIPv4, 10, netlink.SCOPE_UNIVERSE) ipv6Route1 = generateRoute(net.ParseIP(externalIPv6Addr1), 128, config.VirtualServiceIPv6, 10, netlink.SCOPE_UNIVERSE) ipv6Route2 = generateRoute(net.ParseIP(externalIPv6Addr2), 128, config.VirtualServiceIPv6, 10, netlink.SCOPE_UNIVERSE) + + serviceIPSets = map[string]*sync.Map{ + antreaNodePortIPSet: {}, + antreaNodePortIP6Set: {}, + antreaExternalIPIPSet: {}, + antreaExternalIPIP6Set: {}, + } ) func TestSyncRoutes(t *testing.T) { @@ -157,8 +165,7 @@ func TestSyncIPSet(t *testing.T) { nodeNetworkPolicyEnabled bool networkConfig *config.NetworkConfig nodeConfig *config.NodeConfig - nodePortsIPv4 []string - nodePortsIPv6 []string + serviceIPSets map[string][]string clusterNodeIPs map[string]string clusterNodeIP6s map[string]string nodeNetworkPolicyIPSetsIPv4 map[string]sets.Set[string] @@ -204,8 +211,12 @@ func TestSyncIPSet(t *testing.T) { PodIPv4CIDR: podCIDR, PodIPv6CIDR: podCIDRv6, }, - nodePortsIPv4: []string{"192.168.0.2,tcp:10000", "127.0.0.1,tcp:10000"}, - nodePortsIPv6: []string{"fe80::e643:4bff:fe44:ee,tcp:10000", "::1,tcp:10000"}, + serviceIPSets: map[string][]string{ + antreaNodePortIPSet: {"192.168.0.2,tcp:10000", "127.0.0.1,tcp:10000"}, + antreaNodePortIP6Set: {"fe80::e643:4bff:fe44:ee,tcp:10000", "::1,tcp:10000"}, + antreaExternalIPIPSet: {"192.168.0.200", "192.168.0.201", "192.168.0.150", "192.168.0.151"}, + antreaExternalIPIP6Set: {"2001::192:168:0:200", "2001::192:168:0:201", "2001::192:168:0:150", "2001::192:168:0:151"}, + }, clusterNodeIPs: map[string]string{"172.16.3.0/24": "192.168.0.3", "172.16.4.0/24": "192.168.0.4"}, clusterNodeIP6s: map[string]string{"2001:ab03:cd04:5503::/64": "fe80::e643:4bff:fe03", "2001:ab03:cd04:5504::/64": "fe80::e643:4bff:fe04"}, nodeNetworkPolicyIPSetsIPv4: map[string]sets.Set[string]{"ANTREA-POL-RULE1-4": sets.New[string]("1.1.1.1/32", "2.2.2.2/32")}, @@ -221,6 +232,16 @@ func TestSyncIPSet(t *testing.T) { mockIPSet.AddEntry(antreaNodePortIPSet, "127.0.0.1,tcp:10000") mockIPSet.AddEntry(antreaNodePortIP6Set, "fe80::e643:4bff:fe44:ee,tcp:10000") mockIPSet.AddEntry(antreaNodePortIP6Set, "::1,tcp:10000") + mockIPSet.CreateIPSet(antreaExternalIPIPSet, ipset.HashIP, false) + mockIPSet.CreateIPSet(antreaExternalIPIP6Set, ipset.HashIP, true) + mockIPSet.AddEntry(antreaExternalIPIPSet, "192.168.0.150") + mockIPSet.AddEntry(antreaExternalIPIPSet, "192.168.0.151") + mockIPSet.AddEntry(antreaExternalIPIP6Set, "2001::192:168:0:150") + mockIPSet.AddEntry(antreaExternalIPIP6Set, "2001::192:168:0:151") + mockIPSet.AddEntry(antreaExternalIPIPSet, "192.168.0.200") + mockIPSet.AddEntry(antreaExternalIPIPSet, "192.168.0.201") + mockIPSet.AddEntry(antreaExternalIPIP6Set, "2001::192:168:0:200") + mockIPSet.AddEntry(antreaExternalIPIP6Set, "2001::192:168:0:201") mockIPSet.CreateIPSet(clusterNodeIPSet, ipset.HashIP, false) mockIPSet.CreateIPSet(clusterNodeIP6Set, ipset.HashIP, true) mockIPSet.AddEntry(clusterNodeIPSet, "192.168.0.3") @@ -268,16 +289,15 @@ func TestSyncIPSet(t *testing.T) { multicastEnabled: tt.multicastEnabled, connectUplinkToBridge: tt.connectUplinkToBridge, nodeNetworkPolicyEnabled: tt.nodeNetworkPolicyEnabled, - nodePortsIPv4: sync.Map{}, - nodePortsIPv6: sync.Map{}, clusterNodeIPs: sync.Map{}, clusterNodeIP6s: sync.Map{}, + serviceIPSets: make(map[string]*sync.Map), } - for _, nodePortIPv4 := range tt.nodePortsIPv4 { - c.nodePortsIPv4.Store(nodePortIPv4, struct{}{}) - } - for _, nodePortIPv6 := range tt.nodePortsIPv6 { - c.nodePortsIPv6.Store(nodePortIPv6, struct{}{}) + for ipsetName, ipsetEntries := range tt.serviceIPSets { + c.serviceIPSets[ipsetName] = &sync.Map{} + for _, entry := range ipsetEntries { + c.serviceIPSets[ipsetName].Store(entry, struct{}{}) + } } for cidr, nodeIP := range tt.clusterNodeIPs { c.clusterNodeIPs.Store(cidr, nodeIP) @@ -307,8 +327,6 @@ func TestSyncIPTables(t *testing.T) { nodeNetworkPolicyEnabled bool networkConfig *config.NetworkConfig nodeConfig *config.NodeConfig - nodePortsIPv4 []string - nodePortsIPv6 []string markToSNATIP map[uint32]string expectedCalls func(iptables *iptablestest.MockInterfaceMockRecorder) }{ @@ -347,10 +365,34 @@ func TestSyncIPTables(t *testing.T) { mockIPTables.AppendRule(iptables.ProtocolDual, iptables.MangleTable, iptables.PreRoutingChain, []string{"-j", antreaMangleChain, "-m", "comment", "--comment", "Antrea: jump to Antrea mangle rules"}) mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.MangleTable, antreaOutputChain) mockIPTables.AppendRule(iptables.ProtocolDual, iptables.MangleTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) + mockIPTables.ListRules(iptables.ProtocolDual, iptables.NATTable, iptables.PreRoutingChain).Return( + map[iptables.Protocol][]string{ + iptables.ProtocolIPv4: { + "-A " + iptables.PreRoutingChain + " -j " + kubeProxyServiceChain, + "-A " + iptables.PreRoutingChain + " -j " + antreaPreRoutingChain, + }, + iptables.ProtocolIPv6: { + "-A " + iptables.PreRoutingChain + " -j " + antreaPreRoutingChain, + "-A " + iptables.PreRoutingChain + " -j " + kubeProxyServiceChain, + }, + }, nil) + mockIPTables.DeleteRule(iptables.ProtocolIPv4, iptables.NATTable, iptables.PreRoutingChain, []string{"-j", antreaPreRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea prerouting rules"}) + mockIPTables.ListRules(iptables.ProtocolDual, iptables.NATTable, iptables.OutputChain).Return( + map[iptables.Protocol][]string{ + iptables.ProtocolIPv4: { + "-A " + iptables.OutputChain + " -j " + antreaOutputChain, + "-A " + iptables.OutputChain + " -j " + kubeProxyServiceChain, + }, + iptables.ProtocolIPv6: { + "-A " + iptables.OutputChain + " -j " + kubeProxyServiceChain, + "-A " + iptables.OutputChain + " -j " + antreaOutputChain, + }, + }, nil) + mockIPTables.DeleteRule(iptables.ProtocolIPv6, iptables.NATTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.NATTable, antreaPreRoutingChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.NATTable, iptables.PreRoutingChain, []string{"-j", antreaPreRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea prerouting rules"}) + mockIPTables.InsertRule(iptables.ProtocolDual, iptables.NATTable, iptables.PreRoutingChain, []string{"-j", antreaPreRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea prerouting rules"}) mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.NATTable, antreaOutputChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.NATTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) + mockIPTables.InsertRule(iptables.ProtocolDual, iptables.NATTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.FilterTable, antreaInputChain) mockIPTables.AppendRule(iptables.ProtocolDual, iptables.FilterTable, iptables.InputChain, []string{"-j", antreaInputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea input rules"}) mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.FilterTable, antreaOutputChain) @@ -361,6 +403,10 @@ func TestSyncIPTables(t *testing.T) { -A ANTREA-PREROUTING -m comment --comment "Antrea: do not track incoming encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --dst-type LOCAL -j NOTRACK -A ANTREA-OUTPUT -m comment --comment "Antrea: do not track outgoing encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --src-type LOCAL -j NOTRACK -A ANTREA-PREROUTING -m comment --comment "Antrea: drop Pod multicast traffic forwarded via underlay network" -m set --match-set CLUSTER-NODE-IP src -d 224.0.0.0/4 -j DROP +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to external IP request packets" -m set --match-set ANTREA-EXTERNAL-IP dst -j NOTRACK +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to external IP reply packets" -m set --match-set ANTREA-EXTERNAL-IP src -j NOTRACK +-A ANTREA-OUTPUT -m comment --comment "Antrea: do not track local to external IP request packets" -m set --match-set ANTREA-EXTERNAL-IP dst -j NOTRACK +-A ANTREA-OUTPUT -m comment --comment "Antrea: do not track local to external IP reply packets" -m set --match-set ANTREA-EXTERNAL-IP src -j NOTRACK COMMIT *mangle :ANTREA-MANGLE - [0:0] @@ -404,6 +450,10 @@ COMMIT :ANTREA-OUTPUT - [0:0] -A ANTREA-PREROUTING -m comment --comment "Antrea: do not track incoming encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --dst-type LOCAL -j NOTRACK -A ANTREA-OUTPUT -m comment --comment "Antrea: do not track outgoing encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --src-type LOCAL -j NOTRACK +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to external IP request packets" -m set --match-set ANTREA-EXTERNAL-IP6 dst -j NOTRACK +-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to external IP reply packets" -m set --match-set ANTREA-EXTERNAL-IP6 src -j NOTRACK +-A ANTREA-OUTPUT -m comment --comment "Antrea: do not track local to external IP request packets" -m set --match-set ANTREA-EXTERNAL-IP6 dst -j NOTRACK +-A ANTREA-OUTPUT -m comment --comment "Antrea: do not track local to external IP reply packets" -m set --match-set ANTREA-EXTERNAL-IP6 src -j NOTRACK COMMIT *mangle :ANTREA-MANGLE - [0:0] @@ -540,18 +590,18 @@ COMMIT }, }, expectedCalls: func(mockIPTables *iptablestest.MockInterfaceMockRecorder) { - mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.RawTable, antreaPreRoutingChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.RawTable, iptables.PreRoutingChain, []string{"-j", antreaPreRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea prerouting rules"}) - mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.RawTable, antreaOutputChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.RawTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) - mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.FilterTable, antreaForwardChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.FilterTable, iptables.ForwardChain, []string{"-j", antreaForwardChain, "-m", "comment", "--comment", "Antrea: jump to Antrea forwarding rules"}) - mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.NATTable, antreaPostRoutingChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.NATTable, iptables.PostRoutingChain, []string{"-j", antreaPostRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea postrouting rules"}) - mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.MangleTable, antreaMangleChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.MangleTable, iptables.PreRoutingChain, []string{"-j", antreaMangleChain, "-m", "comment", "--comment", "Antrea: jump to Antrea mangle rules"}) - mockIPTables.EnsureChain(iptables.ProtocolDual, iptables.MangleTable, antreaOutputChain) - mockIPTables.AppendRule(iptables.ProtocolDual, iptables.MangleTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) + mockIPTables.EnsureChain(iptables.ProtocolIPv4, iptables.RawTable, antreaPreRoutingChain) + mockIPTables.AppendRule(iptables.ProtocolIPv4, iptables.RawTable, iptables.PreRoutingChain, []string{"-j", antreaPreRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea prerouting rules"}) + mockIPTables.EnsureChain(iptables.ProtocolIPv4, iptables.RawTable, antreaOutputChain) + mockIPTables.AppendRule(iptables.ProtocolIPv4, iptables.RawTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) + mockIPTables.EnsureChain(iptables.ProtocolIPv4, iptables.FilterTable, antreaForwardChain) + mockIPTables.AppendRule(iptables.ProtocolIPv4, iptables.FilterTable, iptables.ForwardChain, []string{"-j", antreaForwardChain, "-m", "comment", "--comment", "Antrea: jump to Antrea forwarding rules"}) + mockIPTables.EnsureChain(iptables.ProtocolIPv4, iptables.NATTable, antreaPostRoutingChain) + mockIPTables.AppendRule(iptables.ProtocolIPv4, iptables.NATTable, iptables.PostRoutingChain, []string{"-j", antreaPostRoutingChain, "-m", "comment", "--comment", "Antrea: jump to Antrea postrouting rules"}) + mockIPTables.EnsureChain(iptables.ProtocolIPv4, iptables.MangleTable, antreaMangleChain) + mockIPTables.AppendRule(iptables.ProtocolIPv4, iptables.MangleTable, iptables.PreRoutingChain, []string{"-j", antreaMangleChain, "-m", "comment", "--comment", "Antrea: jump to Antrea mangle rules"}) + mockIPTables.EnsureChain(iptables.ProtocolIPv4, iptables.MangleTable, antreaOutputChain) + mockIPTables.AppendRule(iptables.ProtocolIPv4, iptables.MangleTable, iptables.OutputChain, []string{"-j", antreaOutputChain, "-m", "comment", "--comment", "Antrea: jump to Antrea output rules"}) mockIPTables.Restore(`*raw :ANTREA-PREROUTING - [0:0] :ANTREA-OUTPUT - [0:0] @@ -1315,7 +1365,7 @@ func TestDeleteSNATRule(t *testing.T) { } } -func TestAddNodePort(t *testing.T) { +func TestAddNodePortConfigs(t *testing.T) { tests := []struct { name string nodePortAddresses []net.IP @@ -1354,14 +1404,18 @@ func TestAddNodePort(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) ipset := ipsettest.NewMockInterface(ctrl) - c := &Client{ipset: ipset} + c := &Client{ipset: ipset, + serviceIPSets: map[string]*sync.Map{ + antreaNodePortIPSet: {}, + antreaNodePortIP6Set: {}, + }} tt.expectedCalls(ipset.EXPECT()) - assert.NoError(t, c.AddNodePort(tt.nodePortAddresses, tt.port, tt.protocol)) + assert.NoError(t, c.AddNodePortConfigs(tt.nodePortAddresses, tt.port, tt.protocol)) }) } } -func TestDeleteNodePort(t *testing.T) { +func TestDeleteNodePortConfigs(t *testing.T) { tests := []struct { name string nodePortAddresses []net.IP @@ -1400,9 +1454,9 @@ func TestDeleteNodePort(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) ipset := ipsettest.NewMockInterface(ctrl) - c := &Client{ipset: ipset} + c := &Client{ipset: ipset, serviceIPSets: serviceIPSets} tt.expectedCalls(ipset.EXPECT()) - assert.NoError(t, c.DeleteNodePort(tt.nodePortAddresses, tt.port, tt.protocol)) + assert.NoError(t, c.DeleteNodePortConfigs(tt.nodePortAddresses, tt.port, tt.protocol)) }) } } @@ -1561,26 +1615,47 @@ func TestAddServiceCIDRRoute(t *testing.T) { } } -func TestAddExternalIPRoute(t *testing.T) { +func TestAddExternalIPConfigs(t *testing.T) { tests := []struct { - name string - externalIPs []string - expectedCalls func(mockNetlink *netlinktest.MockInterfaceMockRecorder) + name string + svcToExternalIPs map[string][]string + expectedCalls func(mockNetlink *netlinktest.MockInterfaceMockRecorder, mockIPSet *ipsettest.MockInterfaceMockRecorder) + expectedServiceExternalIPReferences map[string]sets.Set[string] }{ { - name: "IPv4", - externalIPs: []string{externalIPv4Addr1, externalIPv4Addr2}, - expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { + name: "IPv4", + svcToExternalIPs: map[string][]string{ + "svc1": {externalIPv4Addr1}, + "svc2": {externalIPv4Addr2}, + "svc3": {externalIPv4Addr1, externalIPv4Addr2}, + }, + expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder, mockIPSet *ipsettest.MockInterfaceMockRecorder) { mockNetlink.RouteReplace(ipv4Route1) mockNetlink.RouteReplace(ipv4Route2) + mockIPSet.AddEntry(antreaExternalIPIPSet, externalIPv4Addr1) + mockIPSet.AddEntry(antreaExternalIPIPSet, externalIPv4Addr2) + }, + expectedServiceExternalIPReferences: map[string]sets.Set[string]{ + externalIPv4Addr1: sets.New[string]("svc1", "svc3"), + externalIPv4Addr2: sets.New[string]("svc2", "svc3"), }, }, { - name: "IPv6", - externalIPs: []string{externalIPv6Addr1, externalIPv6Addr2}, - expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { + name: "IPv6", + svcToExternalIPs: map[string][]string{ + "svc1": {externalIPv6Addr1}, + "svc2": {externalIPv6Addr2}, + "svc3": {externalIPv6Addr1, externalIPv6Addr2}, + }, + expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder, mockIPSet *ipsettest.MockInterfaceMockRecorder) { mockNetlink.RouteReplace(ipv6Route1) mockNetlink.RouteReplace(ipv6Route2) + mockIPSet.AddEntry(antreaExternalIPIP6Set, externalIPv6Addr1) + mockIPSet.AddEntry(antreaExternalIPIP6Set, externalIPv6Addr2) + }, + expectedServiceExternalIPReferences: map[string]sets.Set[string]{ + externalIPv6Addr1: sets.New[string]("svc1", "svc3"), + externalIPv6Addr2: sets.New[string]("svc2", "svc3"), }, }, } @@ -1588,48 +1663,82 @@ func TestAddExternalIPRoute(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) mockNetlink := netlinktest.NewMockInterface(ctrl) + mockIPSet := ipsettest.NewMockInterface(ctrl) c := &Client{ - netlink: mockNetlink, - nodeConfig: nodeConfig, + ipset: mockIPSet, + netlink: mockNetlink, + nodeConfig: nodeConfig, + serviceExternalIPReferences: make(map[string]sets.Set[string]), + serviceIPSets: map[string]*sync.Map{ + antreaExternalIPIPSet: {}, + antreaExternalIPIP6Set: {}, + }, } - tt.expectedCalls(mockNetlink.EXPECT()) + tt.expectedCalls(mockNetlink.EXPECT(), mockIPSet.EXPECT()) - for _, externalIP := range tt.externalIPs { - assert.NoError(t, c.AddExternalIPRoute(net.ParseIP(externalIP))) + for svcInfo, externalIPs := range tt.svcToExternalIPs { + for _, externalIP := range externalIPs { + assert.NoError(t, c.AddExternalIPConfigs(svcInfo, net.ParseIP(externalIP))) + } } + assert.Equal(t, tt.expectedServiceExternalIPReferences, c.serviceExternalIPReferences) }) } } func TestDeleteExternalIPRoute(t *testing.T) { tests := []struct { - name string - serviceRoutes map[string]*netlink.Route - externalIPs []string - expectedCalls func(mockNetlink *netlinktest.MockInterfaceMockRecorder) + name string + svcToExternalIPs map[string][]string + serviceRoutes map[string]*netlink.Route + serviceExternalIPReferences map[string]sets.Set[string] + externalIPs []string + expectedCalls func(mockNetlink *netlinktest.MockInterfaceMockRecorder, mockIPSet *ipsettest.MockInterfaceMockRecorder) }{ { name: "IPv4", + svcToExternalIPs: map[string][]string{ + "svc1": {externalIPv4Addr1}, + "svc2": {externalIPv4Addr2}, + "svc3": {externalIPv4Addr1, externalIPv4Addr2}, + }, serviceRoutes: map[string]*netlink.Route{ externalIPv4Addr1: ipv4Route1, externalIPv4Addr2: ipv4Route2, }, + serviceExternalIPReferences: map[string]sets.Set[string]{ + externalIPv4Addr1: sets.New[string]("svc1", "svc3"), + externalIPv4Addr2: sets.New[string]("svc2", "svc3"), + }, externalIPs: []string{externalIPv4Addr1, externalIPv4Addr2}, - expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { + expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder, mockIPSet *ipsettest.MockInterfaceMockRecorder) { mockNetlink.RouteDel(ipv4Route1) mockNetlink.RouteDel(ipv4Route2) + mockIPSet.DelEntry(antreaExternalIPIPSet, externalIPv4Addr1) + mockIPSet.DelEntry(antreaExternalIPIPSet, externalIPv4Addr2) }, }, { name: "IPv6", + svcToExternalIPs: map[string][]string{ + "svc1": {externalIPv6Addr1}, + "svc2": {externalIPv6Addr2}, + "svc3": {externalIPv6Addr1, externalIPv6Addr2}, + }, serviceRoutes: map[string]*netlink.Route{ externalIPv6Addr1: ipv6Route1, externalIPv6Addr2: ipv6Route2, }, + serviceExternalIPReferences: map[string]sets.Set[string]{ + externalIPv6Addr1: sets.New[string]("svc1", "svc3"), + externalIPv6Addr2: sets.New[string]("svc2", "svc3"), + }, externalIPs: []string{externalIPv6Addr1, externalIPv6Addr2}, - expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { + expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder, mockIPSet *ipsettest.MockInterfaceMockRecorder) { mockNetlink.RouteDel(ipv6Route1) mockNetlink.RouteDel(ipv6Route2) + mockIPSet.DelEntry(antreaExternalIPIP6Set, externalIPv6Addr1) + mockIPSet.DelEntry(antreaExternalIPIP6Set, externalIPv6Addr2) }, }, } @@ -1637,19 +1746,33 @@ func TestDeleteExternalIPRoute(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) mockNetlink := netlinktest.NewMockInterface(ctrl) + mockIPSet := ipsettest.NewMockInterface(ctrl) c := &Client{ - netlink: mockNetlink, - nodeConfig: nodeConfig, - serviceRoutes: sync.Map{}, + ipset: mockIPSet, + netlink: mockNetlink, + nodeConfig: nodeConfig, + serviceExternalIPReferences: tt.serviceExternalIPReferences, + serviceIPSets: map[string]*sync.Map{ + antreaExternalIPIPSet: {}, + antreaExternalIPIP6Set: {}, + }, } for ipStr, route := range tt.serviceRoutes { c.serviceRoutes.Store(ipStr, route) + if utilnet.IsIPv6String(ipStr) { + c.serviceIPSets[antreaExternalIPIP6Set].Store(ipStr, struct{}{}) + } else { + c.serviceIPSets[antreaExternalIPIPSet].Store(ipStr, struct{}{}) + } } - tt.expectedCalls(mockNetlink.EXPECT()) - for _, externalIP := range tt.externalIPs { - assert.NoError(t, c.DeleteExternalIPRoute(net.ParseIP(externalIP))) + tt.expectedCalls(mockNetlink.EXPECT(), mockIPSet.EXPECT()) + for svcInfo, externalIPs := range tt.svcToExternalIPs { + for _, externalIP := range externalIPs { + assert.NoError(t, c.DeleteExternalIPConfigs(svcInfo, net.ParseIP(externalIP))) + } } + assert.Equal(t, make(map[string]sets.Set[string]), c.serviceExternalIPReferences) }) } } diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 8fa585265a5..790599abd46 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -62,6 +62,13 @@ type Client struct { nodeRoutes sync.Map // serviceRoutes caches ip routes about Services. serviceRoutes sync.Map + // serviceExternalIPReferences tracks the references of Service IP. The key is the Service IP and the value is + // the set of ServiceInfo strings. Because a Service could have multiple ports and each port will generate a + // ServicePort (which is the unit of the processing), a Service IP route may be required by several ServicePorts. + // With the references, we install the configurations for a Service IP exactly once as long as it's used by any + // ServicePorts and uninstall it exactly once when it's no longer used by any ServicePorts. + // It applies to externalIP and LoadBalancerIP. + serviceExternalIPReferences map[string]sets.Set[string] // netNatStaticMappings caches Windows NetNat for NodePort. netNatStaticMappings sync.Map fwClient *winfirewall.Client @@ -81,12 +88,13 @@ func NewClient(networkConfig *config.NetworkConfig, multicastEnabled bool, serviceCIDRProvider servicecidr.Interface) (*Client, error) { return &Client{ - networkConfig: networkConfig, - winnet: &winnet.Handle{}, - fwClient: winfirewall.NewClient(), - noSNAT: noSNAT, - proxyAll: proxyAll, - serviceCIDRProvider: serviceCIDRProvider, + networkConfig: networkConfig, + winnet: &winnet.Handle{}, + fwClient: winfirewall.NewClient(), + noSNAT: noSNAT, + proxyAll: proxyAll, + serviceCIDRProvider: serviceCIDRProvider, + serviceExternalIPReferences: make(map[string]sets.Set[string]), }, nil } @@ -476,7 +484,7 @@ func (c *Client) DeleteSNATRule(mark uint32) error { } // TODO: nodePortAddresses is not supported currently. -func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { +func (c *Client) AddNodePortConfigs(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { netNatStaticMapping := &winnet.NetNatStaticMapping{ Name: antreaNatNodePort, ExternalIP: net.ParseIP("0.0.0.0"), @@ -493,7 +501,7 @@ func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol b return nil } -func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { +func (c *Client) DeleteNodePortConfigs(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol) error { key := fmt.Sprintf("%d-%s", port, protocol) obj, found := c.netNatStaticMappings.Load(key) if !found { @@ -509,36 +517,54 @@ func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protoco return nil } -// AddExternalIPRoute adds a route entry that forwards traffic destined for the external IP to the Antrea gateway interface. -func (c *Client) AddExternalIPRoute(externalIP net.IP) error { +// AddExternalIPConfigs adds a route entry to forward traffic destined for the external Service IP to the Antrea +// gateway interface. +func (c *Client) AddExternalIPConfigs(svcInfoStr string, externalIP net.IP) error { externalIPStr := externalIP.String() - linkIndex := c.nodeConfig.GatewayConfig.LinkIndex - gw := config.VirtualServiceIPv4 - metric := winnet.MetricHigh - svcIPNet := util.NewIPNet(externalIP) + references, exists := c.serviceExternalIPReferences[externalIPStr] + if !exists { + linkIndex := c.nodeConfig.GatewayConfig.LinkIndex + gw := config.VirtualServiceIPv4 + metric := winnet.MetricHigh + svcIPNet := util.NewIPNet(externalIP) + route := generateRoute(svcIPNet, gw, linkIndex, metric) + if err := c.winnet.ReplaceNetRoute(route); err != nil { + return fmt.Errorf("failed to add route for external IP %s: %w", externalIPStr, err) + } + klog.V(4).InfoS("Added route for external IP", "IP", externalIPStr) - route := generateRoute(svcIPNet, gw, linkIndex, metric) - if err := c.winnet.ReplaceNetRoute(route); err != nil { - return fmt.Errorf("failed to install route for external IP %s: %w", externalIPStr, err) + references = sets.New[string](svcInfoStr) + c.serviceExternalIPReferences[externalIPStr] = references + c.serviceRoutes.Store(externalIPStr, route) + } else { + references.Insert(svcInfoStr) } - c.serviceRoutes.Store(externalIPStr, route) - klog.V(4).InfoS("Added route for external IP", "IP", externalIPStr) return nil } -// DeleteExternalIPRoute deletes the route entry for the external IP. -func (c *Client) DeleteExternalIPRoute(externalIP net.IP) error { +// DeleteExternalIPConfigs deletes the route entry to forward traffic destined for the external Service IP to the Antrea +// gateway interface. +func (c *Client) DeleteExternalIPConfigs(svcInfoStr string, externalIP net.IP) error { externalIPStr := externalIP.String() - route, found := c.serviceRoutes.Load(externalIPStr) - if !found { - klog.V(2).InfoS("Didn't find route for external IP", "IP", externalIPStr) - return nil - } - if err := c.winnet.RemoveNetRoute(route.(*winnet.Route)); err != nil { - return fmt.Errorf("failed to delete route for external IP %s: %w", externalIPStr, err) + references, exists := c.serviceExternalIPReferences[externalIPStr] + if exists && references.Has(svcInfoStr) { + if references.Len() == 1 { + route, found := c.serviceRoutes.Load(externalIPStr) + if !found { + klog.V(2).InfoS("Didn't find route for external IP", "IP", externalIPStr) + return nil + } + if err := c.winnet.RemoveNetRoute(route.(*winnet.Route)); err != nil { + return fmt.Errorf("failed to delete route for external IP %s: %w", externalIPStr, err) + } + klog.V(4).InfoS("Deleted route for external IP", "IP", externalIPStr) + + delete(c.serviceExternalIPReferences, externalIPStr) + c.serviceRoutes.Delete(externalIPStr) + } else { + references.Delete(svcInfoStr) + } } - c.serviceRoutes.Delete(externalIPStr) - klog.V(4).InfoS("Deleted route for external IP", "IP", externalIPStr) return nil } diff --git a/pkg/agent/route/route_windows_test.go b/pkg/agent/route/route_windows_test.go index 0f1648c1002..d9542ab0088 100644 --- a/pkg/agent/route/route_windows_test.go +++ b/pkg/agent/route/route_windows_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" + "k8s.io/apimachinery/pkg/util/sets" "antrea.io/antrea/pkg/agent/config" servicecidrtesting "antrea.io/antrea/pkg/agent/servicecidr/testing" @@ -260,17 +261,17 @@ func TestDeleteRoutes(t *testing.T) { assert.NoError(t, c.DeleteRoutes(podCIDR)) } -func TestAddNodePort(t *testing.T) { +func TestAddNodePortConf(t *testing.T) { ctrl := gomock.NewController(t) mockWinnet := winnettesting.NewMockInterface(ctrl) c := &Client{ winnet: mockWinnet, } mockWinnet.EXPECT().ReplaceNetNatStaticMapping(nodePortNetNatStaticMapping) - assert.NoError(t, c.AddNodePort(nil, nodePort, protocol)) + assert.NoError(t, c.AddNodePortConfigs(nil, nodePort, protocol)) } -func TestDeleteNodePort(t *testing.T) { +func TestDeleteNodePortConf(t *testing.T) { ctrl := gomock.NewController(t) mockWinnet := winnettesting.NewMockInterface(ctrl) c := &Client{ @@ -278,7 +279,7 @@ func TestDeleteNodePort(t *testing.T) { } c.netNatStaticMappings.Store(fmt.Sprintf("%d-%s", nodePort, protocol), nodePortNetNatStaticMapping) mockWinnet.EXPECT().RemoveNetNatStaticMapping(nodePortNetNatStaticMapping) - assert.NoError(t, c.DeleteNodePort(nil, nodePort, protocol)) + assert.NoError(t, c.DeleteNodePortConfigs(nil, nodePort, protocol)) } func TestAddServiceCIDRRoute(t *testing.T) { @@ -399,13 +400,22 @@ func TestAddServiceCIDRRoute(t *testing.T) { } } -func TestAddExternalIPRoute(t *testing.T) { - externalIPs := []string{externalIPv4Addr1, externalIPv4Addr2} +func TestAddExternalIPConfigs(t *testing.T) { + svcToExternalIPs := map[string][]string{ + "svc1": {externalIPv4Addr1}, + "svc2": {externalIPv4Addr2}, + "svc3": {externalIPv4Addr1, externalIPv4Addr2}, + } + expectedServiceExternalIPReferences := map[string]sets.Set[string]{ + externalIPv4Addr1: sets.New[string]("svc1", "svc3"), + externalIPv4Addr2: sets.New[string]("svc2", "svc3"), + } ctrl := gomock.NewController(t) mockWinnet := winnettesting.NewMockInterface(ctrl) c := &Client{ - winnet: mockWinnet, + winnet: mockWinnet, + serviceExternalIPReferences: make(map[string]sets.Set[string]), nodeConfig: &config.NodeConfig{ GatewayConfig: &config.GatewayConfig{ LinkIndex: 10, @@ -415,18 +425,29 @@ func TestAddExternalIPRoute(t *testing.T) { mockWinnet.EXPECT().ReplaceNetRoute(ipv4Route1) mockWinnet.EXPECT().ReplaceNetRoute(ipv4Route2) - for _, externalIP := range externalIPs { - assert.NoError(t, c.AddExternalIPRoute(net.ParseIP(externalIP))) + for svcInfo, externalIPs := range svcToExternalIPs { + for _, externalIP := range externalIPs { + assert.NoError(t, c.AddExternalIPConfigs(svcInfo, net.ParseIP(externalIP))) + } } + assert.Equal(t, expectedServiceExternalIPReferences, c.serviceExternalIPReferences) } -func TestDeleteExternalIPRoute(t *testing.T) { - externalIPs := []string{externalIPv4Addr1, externalIPv4Addr2} +func TestDeleteExternalIPConfigs(t *testing.T) { + svcToExternalIPs := map[string][]string{ + "svc1": {externalIPv4Addr1}, + "svc2": {externalIPv4Addr2}, + "svc3": {externalIPv4Addr1, externalIPv4Addr2}, + } ctrl := gomock.NewController(t) mockWinnet := winnettesting.NewMockInterface(ctrl) c := &Client{ winnet: mockWinnet, + serviceExternalIPReferences: map[string]sets.Set[string]{ + externalIPv4Addr1: sets.New[string]("svc1", "svc3"), + externalIPv4Addr2: sets.New[string]("svc2", "svc3"), + }, } for ipStr, route := range map[string]*winnet.Route{externalIPv4Addr1: ipv4Route1, externalIPv4Addr2: ipv4Route2} { c.serviceRoutes.Store(ipStr, route) @@ -434,8 +455,10 @@ func TestDeleteExternalIPRoute(t *testing.T) { mockWinnet.EXPECT().RemoveNetRoute(ipv4Route1) mockWinnet.EXPECT().RemoveNetRoute(ipv4Route2) - - for _, externalIP := range externalIPs { - assert.NoError(t, c.DeleteExternalIPRoute(net.ParseIP(externalIP))) + for svcInfo, externalIPs := range svcToExternalIPs { + for _, externalIP := range externalIPs { + assert.NoError(t, c.DeleteExternalIPConfigs(svcInfo, net.ParseIP(externalIP))) + } } + assert.Equal(t, make(map[string]sets.Set[string]), c.serviceExternalIPReferences) } diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index 220eec914a7..80e34f315f8 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -85,18 +85,18 @@ func (mr *MockInterfaceMockRecorder) AddEgressRule(arg0, arg1 any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddEgressRule", reflect.TypeOf((*MockInterface)(nil).AddEgressRule), arg0, arg1) } -// AddExternalIPRoute mocks base method. -func (m *MockInterface) AddExternalIPRoute(arg0 net.IP) error { +// AddExternalIPConfigs mocks base method. +func (m *MockInterface) AddExternalIPConfigs(arg0 string, arg1 net.IP) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddExternalIPRoute", arg0) + ret := m.ctrl.Call(m, "AddExternalIPConfigs", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// AddExternalIPRoute indicates an expected call of AddExternalIPRoute. -func (mr *MockInterfaceMockRecorder) AddExternalIPRoute(arg0 any) *gomock.Call { +// AddExternalIPConfigs indicates an expected call of AddExternalIPConfigs. +func (mr *MockInterfaceMockRecorder) AddExternalIPConfigs(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddExternalIPRoute", reflect.TypeOf((*MockInterface)(nil).AddExternalIPRoute), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddExternalIPConfigs", reflect.TypeOf((*MockInterface)(nil).AddExternalIPConfigs), arg0, arg1) } // AddLocalAntreaFlexibleIPAMPodRule mocks base method. @@ -113,18 +113,18 @@ func (mr *MockInterfaceMockRecorder) AddLocalAntreaFlexibleIPAMPodRule(arg0 any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLocalAntreaFlexibleIPAMPodRule", reflect.TypeOf((*MockInterface)(nil).AddLocalAntreaFlexibleIPAMPodRule), arg0) } -// AddNodePort mocks base method. -func (m *MockInterface) AddNodePort(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { +// AddNodePortConfigs mocks base method. +func (m *MockInterface) AddNodePortConfigs(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddNodePort", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "AddNodePortConfigs", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } -// AddNodePort indicates an expected call of AddNodePort. -func (mr *MockInterfaceMockRecorder) AddNodePort(arg0, arg1, arg2 any) *gomock.Call { +// AddNodePortConfigs indicates an expected call of AddNodePortConfigs. +func (mr *MockInterfaceMockRecorder) AddNodePortConfigs(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNodePort", reflect.TypeOf((*MockInterface)(nil).AddNodePort), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNodePortConfigs", reflect.TypeOf((*MockInterface)(nil).AddNodePortConfigs), arg0, arg1, arg2) } // AddOrUpdateNodeNetworkPolicyIPSet mocks base method. @@ -239,18 +239,18 @@ func (mr *MockInterfaceMockRecorder) DeleteEgressRule(arg0, arg1 any) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteEgressRule", reflect.TypeOf((*MockInterface)(nil).DeleteEgressRule), arg0, arg1) } -// DeleteExternalIPRoute mocks base method. -func (m *MockInterface) DeleteExternalIPRoute(arg0 net.IP) error { +// DeleteExternalIPConfigs mocks base method. +func (m *MockInterface) DeleteExternalIPConfigs(arg0 string, arg1 net.IP) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteExternalIPRoute", arg0) + ret := m.ctrl.Call(m, "DeleteExternalIPConfigs", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// DeleteExternalIPRoute indicates an expected call of DeleteExternalIPRoute. -func (mr *MockInterfaceMockRecorder) DeleteExternalIPRoute(arg0 any) *gomock.Call { +// DeleteExternalIPConfigs indicates an expected call of DeleteExternalIPConfigs. +func (mr *MockInterfaceMockRecorder) DeleteExternalIPConfigs(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteExternalIPRoute", reflect.TypeOf((*MockInterface)(nil).DeleteExternalIPRoute), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteExternalIPConfigs", reflect.TypeOf((*MockInterface)(nil).DeleteExternalIPConfigs), arg0, arg1) } // DeleteLocalAntreaFlexibleIPAMPodRule mocks base method. @@ -295,18 +295,18 @@ func (mr *MockInterfaceMockRecorder) DeleteNodeNetworkPolicyIPTables(arg0, arg1 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNodeNetworkPolicyIPTables", reflect.TypeOf((*MockInterface)(nil).DeleteNodeNetworkPolicyIPTables), arg0, arg1) } -// DeleteNodePort mocks base method. -func (m *MockInterface) DeleteNodePort(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { +// DeleteNodePortConfigs mocks base method. +func (m *MockInterface) DeleteNodePortConfigs(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteNodePort", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "DeleteNodePortConfigs", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } -// DeleteNodePort indicates an expected call of DeleteNodePort. -func (mr *MockInterfaceMockRecorder) DeleteNodePort(arg0, arg1, arg2 any) *gomock.Call { +// DeleteNodePortConfigs indicates an expected call of DeleteNodePortConfigs. +func (mr *MockInterfaceMockRecorder) DeleteNodePortConfigs(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNodePort", reflect.TypeOf((*MockInterface)(nil).DeleteNodePort), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNodePortConfigs", reflect.TypeOf((*MockInterface)(nil).DeleteNodePortConfigs), arg0, arg1, arg2) } // DeleteRouteForLink mocks base method. diff --git a/pkg/agent/util/iptables/iptables.go b/pkg/agent/util/iptables/iptables.go index 3025db1bbbd..3eee6ba1d00 100644 --- a/pkg/agent/util/iptables/iptables.go +++ b/pkg/agent/util/iptables/iptables.go @@ -48,6 +48,7 @@ const ( SNATTarget = "SNAT" DNATTarget = "DNAT" RejectTarget = "REJECT" + NotrackTarget = "NOTRACK" PreRoutingChain = "PREROUTING" InputChain = "INPUT" @@ -101,7 +102,7 @@ type Interface interface { DeleteChain(protocol Protocol, table string, chain string) error - ListRules(table string, chain string) ([]string, error) + ListRules(protocol Protocol, table string, chain string) (map[Protocol][]string, error) Restore(data string, flush bool, useIPv6 bool) error @@ -312,14 +313,18 @@ func (c *Client) DeleteChain(protocol Protocol, table string, chain string) erro } // ListRules lists all rules from a chain in a table. -func (c *Client) ListRules(table string, chain string) ([]string, error) { - var allRules []string +func (c *Client) ListRules(protocol Protocol, table string, chain string) (map[Protocol][]string, error) { + allRules := make(map[Protocol][]string) for p := range c.ipts { - rules, err := c.ipts[p].List(table, chain) + ipt := c.ipts[p] + if !matchProtocol(ipt, protocol) { + continue + } + rules, err := ipt.List(table, chain) if err != nil { - return rules, fmt.Errorf("error getting rules from table %s chain %s protocol %s: %v", table, chain, p, err) + return nil, fmt.Errorf("error getting rules from table %s chain %s protocol %s: %v", table, chain, p, err) } - allRules = append(allRules, rules...) + allRules[p] = rules } return allRules, nil } diff --git a/pkg/agent/util/iptables/testing/mock_iptables_linux.go b/pkg/agent/util/iptables/testing/mock_iptables_linux.go index c26e7e3ca39..3b2aaeb3e98 100644 --- a/pkg/agent/util/iptables/testing/mock_iptables_linux.go +++ b/pkg/agent/util/iptables/testing/mock_iptables_linux.go @@ -140,18 +140,18 @@ func (mr *MockInterfaceMockRecorder) InsertRule(arg0, arg1, arg2, arg3 any) *gom } // ListRules mocks base method. -func (m *MockInterface) ListRules(arg0, arg1 string) ([]string, error) { +func (m *MockInterface) ListRules(arg0 iptables.Protocol, arg1, arg2 string) (map[iptables.Protocol][]string, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListRules", arg0, arg1) - ret0, _ := ret[0].([]string) + ret := m.ctrl.Call(m, "ListRules", arg0, arg1, arg2) + ret0, _ := ret[0].(map[iptables.Protocol][]string) ret1, _ := ret[1].(error) return ret0, ret1 } // ListRules indicates an expected call of ListRules. -func (mr *MockInterfaceMockRecorder) ListRules(arg0, arg1 any) *gomock.Call { +func (mr *MockInterfaceMockRecorder) ListRules(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListRules", reflect.TypeOf((*MockInterface)(nil).ListRules), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListRules", reflect.TypeOf((*MockInterface)(nil).ListRules), arg0, arg1, arg2) } // Restore mocks base method. diff --git a/test/e2e/proxy_test.go b/test/e2e/proxy_test.go index 9f87aa1264d..84069191e9b 100644 --- a/test/e2e/proxy_test.go +++ b/test/e2e/proxy_test.go @@ -745,6 +745,9 @@ func testProxyHairpin(t *testing.T, isIPv6 bool) { createAgnhostPod(t, data, agnhostHost, node, true) t.Run("HostNetwork Endpoints", func(t *testing.T) { skipIfProxyAllDisabled(t, data) + // AntreProxy with proxyAll enabled doesn't handle ClusterIP traffic sourced from local Node when kube-proxy + // presents. This test can only work as expected without kube-proxy. + skipIfKubeProxyEnabled(t, data) testProxyIntraNodeHairpinCases(data, t, expectedVirtualIP, agnhostHost, clusterIPUrl, workerNodePortClusterUrl, workerNodePortLocalUrl, lbClusterUrl, lbLocalUrl) testProxyInterNodeHairpinCases(data, t, true, expectedControllerIP, nodeName(0), clusterIPUrl, controllerNodePortClusterUrl, lbClusterUrl) })