diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 9b6c51201ad..26757e6f539 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -174,27 +174,27 @@ func run(o *Options) error { v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode) v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode) proxyFull := o.config.AntreaProxyFull - var nodePortIPv4Map, nodePortIPv6Map map[int][]net.IP + var nodePortAddressesIPv4, nodePortAddressesIPv6 []net.IP if proxyFull { - nodePortIPv4Map, nodePortIPv6Map, err = getAvailableNodePortIPs(o.config.NodePortAddresses, o.config.HostGateway) + nodePortAddressesIPv4, nodePortAddressesIPv6, err = getAvailableNodePortAddresses(o.config.NodePortAddresses) if err != nil { return fmt.Errorf("getting available NodePort IP addresses failed: %v", err) } - if v4Enabled && len(nodePortIPv4Map) == 0 { + if v4Enabled && len(nodePortAddressesIPv4) == 0 { return fmt.Errorf("no qualified NodePort IPv4 addresses was found") } - if v6Enabled && len(nodePortIPv6Map) == 0 { + if v6Enabled && len(nodePortAddressesIPv6) == 0 { return fmt.Errorf("no qualified NodePort IPv6 addresses was found") } } switch { case v4Enabled && v6Enabled: - proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortIPv4Map, nodePortIPv6Map, proxyFull) + proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyFull) case v4Enabled: - proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortIPv4Map, proxyFull) + proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyFull) case v6Enabled: - proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortIPv6Map, proxyFull) + proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyFull) default: return fmt.Errorf("at least one of IPv4 or IPv6 should be enabled") } diff --git a/cmd/antrea-agent/util.go b/cmd/antrea-agent/util.go index af10b3d079f..f7278aefc83 100644 --- a/cmd/antrea-agent/util.go +++ b/cmd/antrea-agent/util.go @@ -20,45 +20,36 @@ import ( "antrea.io/antrea/pkg/agent/util" ) -func getAvailableNodePortIPs(nodePortIPsFromConfig []string, gateway string) (map[int][]net.IP, map[int][]net.IP, error) { +func getAvailableNodePortAddresses(nodePortAddressesFromConfig []string) ([]net.IP, []net.IP, error) { // Get all IP addresses of Node - nodeIPv4Map, nodeIPv6Map, err := util.GetAllNodeIPs() + nodeAddressesIPv4, nodeAddressesIPv6, err := util.GetAllNodeAddresses() if err != nil { return nil, nil, err } - // IP address of Antrea gateway should not be NodePort IP as it cannot be accessed from outside the Cluster. - gatewayIfIndex := util.GetIndexByName(gateway) - delete(nodeIPv4Map, gatewayIfIndex) - delete(nodeIPv6Map, gatewayIfIndex) - // If option `NodePortAddresses` is not set, then all Node IP addresses will be used as NodePort IP address. - if len(nodePortIPsFromConfig) == 0 { - return nodeIPv4Map, nodeIPv6Map, nil + if len(nodePortAddressesFromConfig) == 0 { + return nodeAddressesIPv4, nodeAddressesIPv6, nil } var nodePortIPNets []*net.IPNet - for _, nodePortIP := range nodePortIPsFromConfig { + for _, nodePortIP := range nodePortAddressesFromConfig { _, ipNet, _ := net.ParseCIDR(nodePortIP) nodePortIPNets = append(nodePortIPNets, ipNet) } - nodePortIPv4Map, nodePortIPv6Map := make(map[int][]net.IP), make(map[int][]net.IP) + var nodePortAddressesIPv4, nodePortAddressesIPv6 []net.IP for _, nodePortIPNet := range nodePortIPNets { - for index, ips := range nodeIPv4Map { - for i := range ips { - if nodePortIPNet.Contains(ips[i]) { - nodePortIPv4Map[index] = append(nodePortIPv4Map[index], ips[i]) - } + for i := range nodeAddressesIPv4 { + if nodePortIPNet.Contains(nodeAddressesIPv4[i]) { + nodePortAddressesIPv4 = append(nodePortAddressesIPv4, nodeAddressesIPv4[i]) } } - for index, ips := range nodeIPv6Map { - for i := range ips { - if nodePortIPNet.Contains(ips[i]) { - nodePortIPv6Map[index] = append(nodePortIPv6Map[index], ips[i]) - } + for i := range nodeAddressesIPv6 { + if nodePortIPNet.Contains(nodeAddressesIPv6[i]) { + nodePortAddressesIPv6 = append(nodePortAddressesIPv6, nodeAddressesIPv6[i]) } } } - return nodePortIPv4Map, nodePortIPv6Map, nil + return nodePortAddressesIPv4, nodePortAddressesIPv6, nil } diff --git a/pkg/agent/config/node_config.go b/pkg/agent/config/node_config.go index e06063725de..e52a4e392ad 100644 --- a/pkg/agent/config/node_config.go +++ b/pkg/agent/config/node_config.go @@ -49,8 +49,6 @@ var ( // host Service routing entry. ServiceGWHairpinIPv4 = net.ParseIP("169.254.169.253") ServiceGWHairpinIPv6 = net.ParseIP("fc01::aabb:ccdd:eeff") - - DummyNodePortSvcIP = net.ParseIP("0.0.0.0") ) type GatewayConfig struct { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 8bb309df06c..fa43770785d 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -20,7 +20,6 @@ import ( "net" "antrea.io/libOpenflow/protocol" - v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/config" @@ -109,7 +108,7 @@ type Client interface { // action to maintain the LB decision. // The group with the groupID must be installed before, otherwise the // installation will fail. - InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error + InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error // UninstallServiceFlows removes flows installed by InstallServiceFlows. UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error // InstallLoadBalancerServiceFromOutsideFlows installs flows for LoadBalancer Service traffic from outside node. @@ -120,19 +119,6 @@ type Client interface { // UninstallLoadBalancerServiceFromOutsideFlows removes flows installed by InstallLoadBalancerServiceFromOutsideFlows. UninstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error - // InstallInitNodePortClassifierFlows installs the first clause flow of conjunction which is used to classify the first packet of - // Service NodePort, with every NodePort IP address as destination IP address. - InstallInitNodePortClassifierFlows(nodePortIPMap map[int][]net.IP, isIPv6 bool) error - - // InstallServiceClassifierFlow installs flows to classify the first packet of Service. For NodePort/LoadBalancer - // whose externalTrafficPolicy is Cluster, or NodePort/LoadBalancer whose externalTrafficPolicy is Local and client - // is from localhost, the flow will set a register to indicate that the packet requires SNAT. The flow will also - // generate a learned flow to rewrite the destination MAC of response packet whose request packet is from remote - // client. - InstallServiceClassifierFlow(svcType v1.ServiceType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, nodeLocalExternal bool) error - // UninstallServiceClassifierFlow removes flows installed by InstallServiceClassifierFlow. - UninstallServiceClassifierFlow(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error - // GetFlowTableStatus should return an array of flow table status, all existing flow tables should be included in the list. GetFlowTableStatus() []binding.TableStatus @@ -537,10 +523,6 @@ func generateServicePortFlowCacheKey(svcIP net.IP, svcPort uint16, protocol bind return fmt.Sprintf("S%s%s%x", svcIP, protocol, svcPort) } -func generateServiceClassifierFlowCacheKey(svcIP net.IP, svcPort uint16, protocol binding.Protocol) string { - return fmt.Sprintf("S%s%s%x/C", svcIP, protocol, svcPort) -} - func (c *client) InstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() @@ -585,38 +567,13 @@ func (c *client) UninstallEndpointFlows(protocol binding.Protocol, endpoint prox return c.deleteFlows(c.serviceFlowCache, cacheKey) } -func (c *client) InstallInitNodePortClassifierFlows(nodePortIPMap map[int][]net.IP, isIPv6 bool) error { - flows := c.initServiceClassifierFlows(nodePortIPMap, isIPv6) - if err := c.ofEntryOperations.AddAll(flows); err != nil { - return err - } - c.defaultServiceFlows = append(c.defaultServiceFlows, flows...) - return nil -} - -func (c *client) InstallServiceClassifierFlow(svcType v1.ServiceType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, nodeLocalExternal bool) error { - c.replayMutex.RLock() - defer c.replayMutex.RUnlock() - - flows := c.serviceClassifierFlow(svcType, svcIP, svcPort, protocol, nodeLocalExternal) - cacheKey := generateServiceClassifierFlowCacheKey(svcIP, svcPort, protocol) - return c.addFlows(c.serviceFlowCache, cacheKey, flows) -} - -func (c *client) UninstallServiceClassifierFlow(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error { - c.replayMutex.RLock() - defer c.replayMutex.RUnlock() - cacheKey := generateServiceClassifierFlowCacheKey(svcIP, svcPort, protocol) - return c.deleteFlows(c.serviceFlowCache, cacheKey) -} - -func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error { +func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() var flows []binding.Flow - flows = append(flows, c.serviceLBFlows(groupID, svcIP, svcPort, protocol, affinityTimeout != 0)...) + flows = append(flows, c.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0, nodeLocalExternal)) if affinityTimeout != 0 { - flows = append(flows, c.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout)) + flows = append(flows, c.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout, nodeLocalExternal)) } cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol) return c.addFlows(c.serviceFlowCache, cacheKey, flows) @@ -666,7 +623,7 @@ func (c *client) InstallDefaultServiceFlows() error { if err := c.ofEntryOperations.AddAll(flows); err != nil { return err } - c.defaultServiceFlows = append(c.defaultServiceFlows, flows...) + c.defaultServiceFlows = flows return nil } diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 29a4f2a6ddc..11b63b52285 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -26,7 +26,6 @@ import ( "antrea.io/libOpenflow/protocol" "antrea.io/ofnet/ofctrl" - v1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -52,7 +51,6 @@ const ( serviceConntrackTable binding.TableIDType = 24 // serviceConntrackTable use a new ct_zone to transform SNATed connections. conntrackTable binding.TableIDType = 30 conntrackStateTable binding.TableIDType = 31 - serviceClassifierTable binding.TableIDType = 35 sessionAffinityTable binding.TableIDType = 40 dnatTable binding.TableIDType = 40 serviceLBTable binding.TableIDType = 41 @@ -96,14 +94,6 @@ const ( ipv6MulticastAddr = "FF00::/8" // IPv6 link-local prefix ipv6LinkLocalAddr = "FE80::/10" - - // The conjunction IDs which are used to classify the first packet of NodePort/LoadBalance at table serviceClassifierTable. - // - For NodePort/LoadBalancer whose externalTrafficPolicy is Cluster, SNAT is required. - // - For NodePort/LoadBalancer whose externalTrafficPolicy is Local, SNAT is not required. - clusterConjIDIPv4 = uint32(41) - localConjIDIPv4 = uint32(42) - clusterConjDIPv6 = uint32(61) - localConjIDIPv6 = uint32(62) ) type ofAction int32 @@ -149,7 +139,6 @@ var ( {serviceConntrackTable, "serviceConntrack"}, {conntrackTable, "ConntrackZone"}, {conntrackStateTable, "ConntrackState"}, - {serviceClassifierTable, "serviceClassifier"}, {dnatTable, "DNAT(SessionAffinity)"}, {sessionAffinityTable, "SessionAffinity"}, {serviceLBTable, "ServiceLB"}, @@ -247,12 +236,10 @@ const ( endpointIPReg regType = 3 // Use reg3 to store endpoint IP endpointPortReg regType = 4 // Use reg4[0..15] to store endpoint port serviceLearnReg = endpointPortReg // Use reg4[16..18] to store endpoint selection states. - isNodePortReg = endpointPortReg // Use reg4[19] to store the status of whether Service is NodePort. - serviceSNATReg = endpointPortReg // Use reg4[20] to store the status of whether Service traffic from gateway requires SNAT. + serviceSNATReg = endpointPortReg // Use reg4[19] to store the status of whether Service traffic from gateway requires SNAT. EgressReg regType = 5 IngressReg regType = 6 - - TraceflowReg regType = 9 // Use reg9[28..31] to store traceflow dataplaneTag. + TraceflowReg regType = 9 // Use reg9[28..31] to store traceflow dataplaneTag. // CNPDenyConjIDReg reuses reg3 which will also be used for storing endpoint IP to store the rule ID. Since // the service selection will finish when a packet hitting NetworkPolicy related rules, there is no conflict. CNPDenyConjIDReg regType = 3 @@ -266,8 +253,6 @@ const ( marksRegServiceNeedLearn uint32 = 0b011 // marksRegServiceNeedSNAT indicates that the packet requires SNAT. marksRegServiceNeedSNAT uint32 = 0b1 - // marksServiceIsNodePort indicates that the Service is NodePort. - marksServiceIsNodePort uint32 = 0b1 CtZone = 0xfff0 CtZoneV6 = 0xffe6 @@ -371,10 +356,7 @@ var ( // 3. NodePort/LoadBalancer client is from remote, ExternalTrafficPolicy is Cluster, // Endpoint is on host network. // When the Endpoint is not on host network, Antrea gateway IP is used to perform SNAT. - serviceSNATMarkRange = binding.Range{20, 20} - // isNodePortRegRange takes a 1-bit range of register isNodePortReg to mark whether the - // Service is NodePort. - isNodePortRegRange = binding.Range{19, 19} + serviceSNATMarkRange = binding.Range{19, 19} // metricIngressRuleIDRange takes 0..31 range of ct_label to store the ingress rule ID. metricIngressRuleIDRange = binding.Range{0, 31} // metricEgressRuleIDRange takes 32..63 range of ct_label to store the egress rule ID. @@ -433,7 +415,7 @@ type client struct { pipeline map[binding.TableIDType]binding.Table // Flow caches for corresponding deletions. nodeFlowCache, podFlowCache, serviceFlowCache, snatFlowCache, tfFlowCache *flowCategoryCache - // "fixed" flows installInstallNodePortIPFlowsed by the agent after initialization and which do not change during + // "fixed" flows installed by the agent after initialization and which do not change during // the lifetime of the client. gatewayFlows, defaultServiceFlows, defaultTunnelFlows, hostNetworkingFlows []binding.Flow // ofEntryOperations is a wrapper interface for OpenFlow entry Add / Modify / Delete operations. It @@ -642,20 +624,11 @@ func (c *client) connectionTrackFlows(category cookie.Category) []binding.Flow { flows := c.conntrackBasicFlows(category) if c.enableProxy { // Replace the default flow with multiple resubmits actions. - if c.enableProxyFull { - flows = append(flows, connectionTrackStateTable.BuildFlow(priorityMiss). - Cookie(c.cookieAllocator.Request(category).Raw()). - Action().ResubmitToTable(serviceClassifierTable). - Action().ResubmitToTable(sessionAffinityTable). - Action().ResubmitToTable(serviceLBTable). - Done()) - } else { - flows = append(flows, connectionTrackStateTable.BuildFlow(priorityMiss). - Cookie(c.cookieAllocator.Request(category).Raw()). - Action().ResubmitToTable(sessionAffinityTable). - Action().ResubmitToTable(serviceLBTable). - Done()) - } + flows = append(flows, connectionTrackStateTable.BuildFlow(priorityMiss). + Cookie(c.cookieAllocator.Request(category).Raw()). + Action().ResubmitToTable(sessionAffinityTable). + Action().ResubmitToTable(serviceLBTable). + Done()) for _, proto := range c.ipProtocols { gatewayIP := c.nodeConfig.GatewayConfig.IPv4 @@ -710,6 +683,7 @@ func (c *client) connectionTrackFlows(category cookie.Category) []binding.Flow { // a Node cannot hold more than one pod like the pod. There is no point to expose the pod as NodePort, // as it makes no difference to access it directly. When externalTrafficPolicy is Local, there is // just only one Endpoint, and it's not necessary to expose the pod with NodePort. + // This flow is also used to match the first packet of ClusterIP and the Endpoint is on host network. serviceConnectionTrackCommitTable.BuildFlow(priorityHigh).MatchProtocol(proto). MatchRegRange(int(PortCacheReg), config.HostGatewayOFPort, ofPortRegRange). Cookie(c.cookieAllocator.Request(category).Raw()). @@ -1382,31 +1356,20 @@ func (c *client) l3FwdServiceDefaultFlowsViaGW(ipProto binding.Protocol, categor gatewayMAC := c.nodeConfig.GatewayConfig.MAC flows := []binding.Flow{ - /* This flow is used to match the packets of Service traffic: - - NodePort/LoadBalancer request packets which pass through Antrea gateway and the Service Endpoint is on host network. - - ClusterIP request packets which are from Antrea gateway and the Service Endpoint is on host network. - The matched packets should leave through Antrea gateway, however, they also enter through Antrea gateway. This - is hairpin traffic. - */ + // This flow is used to match the packets of Service traffic: + // - NodePort/LoadBalancer request packets which pass through Antrea gateway and the Service Endpoint is on host network. + // - ClusterIP request packets which are from Antrea gateway and the Service Endpoint is on host network. + // - NodePort/LoadBalancer/ClusterIP response packets. + // The matched packets should leave through Antrea gateway, however, they also enter through Antrea gateway. This + // is hairpin traffic. c.pipeline[l3ForwardingTable].BuildFlow(priorityLow).MatchProtocol(ipProto). MatchCTMark(ServiceCTMark, nil). - MatchCTStateRpl(false). MatchCTStateTrk(true). MatchRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange). Action().SetDstMAC(gatewayMAC). Action().ResubmitToTable(l3DecTTLTable). Cookie(c.cookieAllocator.Request(category).Raw()). Done(), - // This flow is used to match the response packets of NodePort/LoadBalancer traffic. The destination MAC address - // and output port will be set on serviceResponseProcessTable. - c.pipeline[l3ForwardingTable].BuildFlow(priorityLow).MatchProtocol(ipProto). - MatchCTMark(ServiceCTMark, nil). - MatchCTStateRpl(true). - MatchCTStateTrk(true). - MatchRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange). - Action().ResubmitToTable(l3DecTTLTable). - Cookie(c.cookieAllocator.Request(category).Raw()). - Done(), } return flows } @@ -2086,114 +2049,6 @@ func (c *client) snatRuleFlow(ofPort uint32, snatIP net.IP, snatMark uint32, loc } } -func (c *client) initServiceClassifierFlows(nodePortIPMap map[int][]net.IP, isIPv6 bool) []binding.Flow { - clusterConjID := clusterConjIDIPv4 - localConjID := localConjIDIPv4 - ipProtocol := binding.ProtocolIP - serviceGWHairpinIP := config.ServiceGWHairpinIPv4 - if isIPv6 { - clusterConjID = clusterConjDIPv6 - localConjID = localConjIDIPv6 - ipProtocol = binding.ProtocolIPv6 - serviceGWHairpinIP = config.ServiceGWHairpinIPv6 - } - - var flows []binding.Flow - for _, ips := range nodePortIPMap { - for _, ip := range ips { - flows = append(flows, - // This flow is used to match the first packet's destination IP address: - // 1. NodePort Service whose externalTrafficPolicy is Cluster, and client is from remote/localhost. - // 2. NodePort Service whose externalTrafficPolicy is Local, and client is from remote. - c.pipeline[serviceClassifierTable].BuildFlow(priorityNormal). - Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). - MatchProtocol(ipProtocol). - MatchDstIP(ip). - Action().Conjunction(clusterConjID, 1, 2). - Action().Conjunction(localConjID, 1, 2). - Done(), - ) - } - } - flows = append(flows, - c.pipeline[serviceClassifierTable].BuildFlow(priorityNormal). - Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). - MatchProtocol(ipProtocol). - MatchDstIP(serviceGWHairpinIP). - Action().Conjunction(clusterConjID, 1, 2). - Action().Conjunction(localConjID, 1, 2). - Done(), - ) - - flows = append(flows, - // This flow is used to perform actions for the first packet of NodePort Service whose externalTrafficPolicy is - // Cluster, and client is from remote/localhost. - c.pipeline[serviceClassifierTable].BuildFlow(priorityNormal). - MatchProtocol(ipProtocol). - MatchConjID(clusterConjID). - Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). - Action().LoadRegRange(int(serviceSNATReg), marksRegServiceNeedSNAT, serviceSNATMarkRange). - Action().LoadRegRange(int(isNodePortReg), marksServiceIsNodePort, isNodePortRegRange). - Done(), - // This flow is used to perform actions for the first packet of NodePort Service whose externalTrafficPolicy is - // Local, and client is from remote. - c.pipeline[serviceClassifierTable].BuildFlow(priorityNormal). - MatchProtocol(ipProtocol). - MatchConjID(localConjID). - Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). - Action().LoadRegRange(int(isNodePortReg), marksServiceIsNodePort, isNodePortRegRange). - Done(), - ) - return flows -} - -func (c *client) serviceClassifierFlow(svcType v1.ServiceType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, nodeLocalExternal bool) []binding.Flow { - var flows []binding.Flow - isIPv6 := false - if protocol == binding.ProtocolTCPv6 || protocol == binding.ProtocolUDPv6 || protocol == binding.ProtocolSCTPv6 { - isIPv6 = true - } - - if svcType == v1.ServiceTypeNodePort { - clusterConjID := clusterConjIDIPv4 - localConjID := localConjIDIPv4 - if isIPv6 { - clusterConjID = clusterConjDIPv6 - localConjID = localConjIDIPv6 - } - - if nodeLocalExternal { - flows = append(flows, - c.pipeline[serviceClassifierTable].BuildFlow(priorityHigh). - Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). - MatchProtocol(protocol). - MatchDstPort(svcPort, nil). - Action().Conjunction(localConjID, 2, 2). - Done(), - ) - } else { - flows = append(flows, - c.pipeline[serviceClassifierTable].BuildFlow(priorityNormal). - Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). - MatchProtocol(protocol). - MatchDstPort(svcPort, nil). - Action().Conjunction(clusterConjID, 2, 2). - Done()) - } - } else { - if !nodeLocalExternal { - flows = append(flows, c.pipeline[serviceClassifierTable].BuildFlow(priorityNormal). - MatchProtocol(protocol). - MatchDstIP(svcIP). - MatchDstPort(svcPort, nil). - Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). - Action().LoadRegRange(int(serviceSNATReg), marksRegServiceNeedSNAT, serviceSNATMarkRange). - Done()) - } - } - return flows -} - // loadBalancerServiceFromOutsideFlow generates the flow to forward LoadBalancer service traffic from outside node // to gateway. kube-proxy will then handle the traffic. // This flow is for Windows Node only. @@ -2210,21 +2065,23 @@ func (c *client) loadBalancerServiceFromOutsideFlow(svcIP net.IP, svcPort uint16 // serviceLearnFlow generates the flow with learn action which adds new flows in // sessionAffinityTable according to the Endpoint selection decision. -func (c *client) serviceLearnFlow(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) binding.Flow { +func (c *client) serviceLearnFlow(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) binding.Flow { // Using unique cookie ID here to avoid learned flow cascade deletion. cookieID := c.cookieAllocator.RequestWithObjectID(cookie.Service, uint32(groupID)).Raw() + var learnFlowBuilder binding.FlowBuilder - if svcIP.Equal(config.DummyNodePortSvcIP) { + if !nodeLocalExternal { learnFlowBuilder = c.pipeline[serviceLBTable].BuildFlow(priorityLow). MatchRegRange(int(serviceLearnReg), marksRegServiceNeedLearn, serviceLearnRegRange). MatchDstIP(svcIP). MatchProtocol(protocol). MatchDstPort(svcPort, nil). - Cookie(cookieID) + Cookie(cookieID). + Action().LoadRegRange(int(serviceSNATReg), marksRegServiceNeedSNAT, serviceSNATMarkRange) } else { learnFlowBuilder = c.pipeline[serviceLBTable].BuildFlow(priorityLow). - MatchRegRange(int(isNodePortReg), marksServiceIsNodePort, isNodePortRegRange). MatchRegRange(int(serviceLearnReg), marksRegServiceNeedLearn, serviceLearnRegRange). + MatchDstPort(svcPort, nil). MatchProtocol(protocol). Cookie(cookieID) } @@ -2284,7 +2141,7 @@ func (c *client) serviceLearnFlow(groupID binding.GroupIDType, svcIP net.IP, svc // serviceLBFlows generates the flow which uses the specific group to do Endpoint // selection. -func (c *client) serviceLBFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, withSessionAffinity bool) []binding.Flow { +func (c *client) serviceLBFlow(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, withSessionAffinity, nodeLocalExternal bool) binding.Flow { var lbResultMark uint32 if withSessionAffinity { lbResultMark = marksRegServiceNeedLearn @@ -2292,35 +2149,30 @@ func (c *client) serviceLBFlows(groupID binding.GroupIDType, svcIP net.IP, svcPo lbResultMark = marksRegServiceSelected } - var flows []binding.Flow - // This flow is used to match the first packet of non-NodePort. - if !svcIP.Equal(config.DummyNodePortSvcIP) { - flows = append(flows, c.pipeline[serviceLBTable].BuildFlow(priorityNormal). + if !nodeLocalExternal { + return c.pipeline[serviceLBTable].BuildFlow(priorityNormal). MatchProtocol(protocol). MatchDstPort(svcPort, nil). MatchDstIP(svcIP). MatchRegRange(int(serviceLearnReg), marksRegServiceNeedLB, serviceLearnRegRange). Action().LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange). Action().LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange). + Action().LoadRegRange(int(serviceSNATReg), marksRegServiceNeedSNAT, serviceSNATMarkRange). Action().Group(groupID). Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). - Done(), - ) + Done() } else { - unionVal := (marksServiceIsNodePort << serviceLearnRegRange.Length()) + marksRegServiceNeedLB - flows = append(flows, - c.pipeline[serviceLBTable].BuildFlow(priorityNormal). - MatchProtocol(protocol). - MatchDstPort(svcPort, nil). - MatchRegRange(int(serviceLearnReg), unionVal, binding.Range{16, 19}). - Action().LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange). - Action().LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange). - Action().Group(groupID). - Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). - Done(), - ) + return c.pipeline[serviceLBTable].BuildFlow(priorityNormal). + MatchProtocol(protocol). + MatchDstPort(svcPort, nil). + MatchDstIP(svcIP). + MatchRegRange(int(serviceLearnReg), marksRegServiceNeedLB, serviceLearnRegRange). + Action().LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange). + Action().LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange). + Action().Group(groupID). + Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). + Done() } - return flows } // endpointDNATFlow generates the flow which transforms the Service Cluster IP @@ -2498,7 +2350,6 @@ func (c *client) generatePipeline() { if c.enableProxyFull { c.pipeline[serviceHairpinTable] = bridge.CreateTable(serviceHairpinTable, serviceConntrackTable, binding.TableMissActionNext) c.pipeline[serviceConntrackTable] = bridge.CreateTable(serviceConntrackTable, conntrackTable, binding.TableMissActionNext) - c.pipeline[serviceClassifierTable] = bridge.CreateTable(serviceClassifierTable, binding.LastTableID, binding.TableMissActionNone) c.pipeline[serviceConntrackCommitTable] = bridge.CreateTable(serviceConntrackCommitTable, hairpinSNATTable, binding.TableMissActionNext) } else { c.pipeline[serviceHairpinTable] = bridge.CreateTable(serviceHairpinTable, conntrackTable, binding.TableMissActionNext) diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index f554015919a..7748e7a17e4 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -25,7 +25,6 @@ import ( openflow "antrea.io/antrea/pkg/ovs/openflow" proxy "antrea.io/antrea/third_party/proxy" gomock "github.com/golang/mock/gomock" - v1 "k8s.io/api/core/v1" net "net" reflect "reflect" ) @@ -335,20 +334,6 @@ func (mr *MockClientMockRecorder) InstallGatewayFlows() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallGatewayFlows", reflect.TypeOf((*MockClient)(nil).InstallGatewayFlows)) } -// InstallInitNodePortClassifierFlows mocks base method -func (m *MockClient) InstallInitNodePortClassifierFlows(arg0 map[int][]net.IP, arg1 bool) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallInitNodePortClassifierFlows", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// InstallInitNodePortClassifierFlows indicates an expected call of InstallInitNodePortClassifierFlows -func (mr *MockClientMockRecorder) InstallInitNodePortClassifierFlows(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallInitNodePortClassifierFlows", reflect.TypeOf((*MockClient)(nil).InstallInitNodePortClassifierFlows), arg0, arg1) -} - // InstallLoadBalancerServiceFromOutsideFlows mocks base method func (m *MockClient) InstallLoadBalancerServiceFromOutsideFlows(arg0 net.IP, arg1 uint16, arg2 openflow.Protocol) error { m.ctrl.T.Helper() @@ -433,32 +418,18 @@ func (mr *MockClientMockRecorder) InstallSNATMarkFlows(arg0, arg1 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallSNATMarkFlows", reflect.TypeOf((*MockClient)(nil).InstallSNATMarkFlows), arg0, arg1) } -// InstallServiceClassifierFlow mocks base method -func (m *MockClient) InstallServiceClassifierFlow(arg0 v1.ServiceType, arg1 net.IP, arg2 uint16, arg3 openflow.Protocol, arg4 bool) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallServiceClassifierFlow", arg0, arg1, arg2, arg3, arg4) - ret0, _ := ret[0].(error) - return ret0 -} - -// InstallServiceClassifierFlow indicates an expected call of InstallServiceClassifierFlow -func (mr *MockClientMockRecorder) InstallServiceClassifierFlow(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallServiceClassifierFlow", reflect.TypeOf((*MockClient)(nil).InstallServiceClassifierFlow), arg0, arg1, arg2, arg3, arg4) -} - // InstallServiceFlows mocks base method -func (m *MockClient) InstallServiceFlows(arg0 openflow.GroupIDType, arg1 net.IP, arg2 uint16, arg3 openflow.Protocol, arg4 uint16) error { +func (m *MockClient) InstallServiceFlows(arg0 openflow.GroupIDType, arg1 net.IP, arg2 uint16, arg3 openflow.Protocol, arg4 uint16, arg5 bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallServiceFlows", arg0, arg1, arg2, arg3, arg4) + ret := m.ctrl.Call(m, "InstallServiceFlows", arg0, arg1, arg2, arg3, arg4, arg5) ret0, _ := ret[0].(error) return ret0 } // InstallServiceFlows indicates an expected call of InstallServiceFlows -func (mr *MockClientMockRecorder) InstallServiceFlows(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallServiceFlows(arg0, arg1, arg2, arg3, arg4, arg5 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallServiceFlows", reflect.TypeOf((*MockClient)(nil).InstallServiceFlows), arg0, arg1, arg2, arg3, arg4) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallServiceFlows", reflect.TypeOf((*MockClient)(nil).InstallServiceFlows), arg0, arg1, arg2, arg3, arg4, arg5) } // InstallServiceGroup mocks base method @@ -750,20 +721,6 @@ func (mr *MockClientMockRecorder) UninstallSNATMarkFlows(arg0 interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallSNATMarkFlows", reflect.TypeOf((*MockClient)(nil).UninstallSNATMarkFlows), arg0) } -// UninstallServiceClassifierFlow mocks base method -func (m *MockClient) UninstallServiceClassifierFlow(arg0 net.IP, arg1 uint16, arg2 openflow.Protocol) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UninstallServiceClassifierFlow", arg0, arg1, arg2) - ret0, _ := ret[0].(error) - return ret0 -} - -// UninstallServiceClassifierFlow indicates an expected call of UninstallServiceClassifierFlow -func (mr *MockClientMockRecorder) UninstallServiceClassifierFlow(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallServiceClassifierFlow", reflect.TypeOf((*MockClient)(nil).UninstallServiceClassifierFlow), arg0, arg1, arg2) -} - // UninstallServiceFlows mocks base method func (m *MockClient) UninstallServiceFlows(arg0 net.IP, arg1 uint16, arg2 openflow.Protocol) error { m.ctrl.T.Helper() diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 655fb123574..00c7e19b82b 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -105,7 +105,7 @@ type proxier struct { stopChan <-chan struct{} ofClient openflow.Client routeClient route.Interface - nodePortIPMap map[int][]net.IP + nodePortAddresses []net.IP hostGateWay string isIPv6 bool proxyFullEnabled bool @@ -268,44 +268,42 @@ func smallSliceDifference(s1, s2 []string) []string { } func (p *proxier) installNodePortService(groupID binding.GroupIDType, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error { - if err := p.ofClient.InstallServiceFlows(groupID, agentconfig.DummyNodePortSvcIP, svcPort, protocol, affinityTimeout); err != nil { - return fmt.Errorf("failed to install Service NodePort load balancing flows: %w", err) + svcIP := agentconfig.ServiceGWHairpinIPv4 + if p.isIPv6 { + svcIP = agentconfig.ServiceGWHairpinIPv6 } - if err := p.ofClient.InstallServiceClassifierFlow(corev1.ServiceTypeNodePort, agentconfig.DummyNodePortSvcIP, svcPort, protocol, nodeLocalExternal); err != nil { - return fmt.Errorf("failed to install Service NodePort classifying flows: %w", err) + if err := p.ofClient.InstallServiceFlows(groupID, svcIP, svcPort, protocol, affinityTimeout, nodeLocalExternal); err != nil { + return fmt.Errorf("failed to install Service NodePort load balancing flows: %w", err) } - if err := p.routeClient.AddNodePort(p.nodePortIPMap, svcPort, protocol, p.isIPv6); err != nil { + if err := p.routeClient.AddNodePort(p.nodePortAddresses, svcPort, protocol, p.isIPv6); err != nil { return fmt.Errorf("failed to install Service NodePort traffic redirecting flows: %w", err) } return nil } func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Protocol) error { - if err := p.ofClient.UninstallServiceFlows(agentconfig.DummyNodePortSvcIP, svcPort, protocol); err != nil { - return fmt.Errorf("failed to remove Service NodePort NodePort load balancing flows: %w", err) + svcIP := agentconfig.ServiceGWHairpinIPv4 + if p.isIPv6 { + svcIP = agentconfig.ServiceGWHairpinIPv6 } - if err := p.ofClient.UninstallServiceClassifierFlow(agentconfig.DummyNodePortSvcIP, svcPort, protocol); err != nil { - return fmt.Errorf("failed to remove Service NodePort classifying flows: %w", err) + if err := p.ofClient.UninstallServiceFlows(svcIP, svcPort, protocol); err != nil { + return fmt.Errorf("failed to remove Service NodePort NodePort load balancing flows: %w", err) } - if err := p.routeClient.DeleteNodePort(p.nodePortIPMap, svcPort, protocol, p.isIPv6); err != nil { + if err := p.routeClient.DeleteNodePort(p.nodePortAddresses, svcPort, protocol, p.isIPv6); err != nil { return fmt.Errorf("failed to remove Service NodePort traffic redirecting flows: %w", err) } return nil } -func (p *proxier) installLoadBalancerService(groupID binding.GroupIDType, loadBalancerIPStrings []string, - svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error { +func (p *proxier) installLoadBalancerService(groupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error { for _, ingress := range loadBalancerIPStrings { if ingress != "" { - if err := p.ofClient.InstallServiceFlows(groupID, net.ParseIP(ingress), svcPort, protocol, affinityTimeout); err != nil { + if err := p.ofClient.InstallServiceFlows(groupID, net.ParseIP(ingress), svcPort, protocol, affinityTimeout, nodeLocalExternal); err != nil { return fmt.Errorf("failed to install Service LoadBalancer load balancing flows: %w", err) } if err := p.ofClient.InstallLoadBalancerServiceFromOutsideFlows(net.ParseIP(ingress), svcPort, protocol); err != nil { return fmt.Errorf("failed to install Service LoadBalancer flows: %w", err) } - if err := p.ofClient.InstallServiceClassifierFlow(corev1.ServiceTypeLoadBalancer, net.ParseIP(ingress), svcPort, protocol, nodeLocalExternal); err != nil { - return fmt.Errorf("failed to install Service LoadBalancer classifying flows: %w", err) - } } } if err := p.routeClient.AddLoadBalancer(loadBalancerIPStrings, p.isIPv6); err != nil { @@ -323,9 +321,6 @@ func (p *proxier) uninstallLoadBalancerService(loadBalancerIPStrings []string, s if err := p.ofClient.UninstallLoadBalancerServiceFromOutsideFlows(net.ParseIP(ingress), svcPort, protocol); err != nil { return fmt.Errorf("failed to remove Service LoadBalancer flows: %w", err) } - if err := p.ofClient.UninstallServiceClassifierFlow(net.ParseIP(ingress), svcPort, protocol); err != nil { - return fmt.Errorf("failed to remove Service LoadBalancer classifying flows: %w", err) - } } } if err := p.routeClient.DeleteLoadBalancer(loadBalancerIPStrings, p.isIPv6); err != nil { @@ -483,7 +478,7 @@ func (p *proxier) installServices() { } // Install ClusterIP flows of current Service. - if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds())); err != nil { + if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds()), false); err != nil { klog.Errorf("Error when installing Service flows: %v", err) continue } @@ -705,9 +700,6 @@ func (p *proxier) Run(stopCh <-chan struct{}) { if err := p.routeClient.InitServiceProxyConfig(p.isIPv6); err != nil { panic(err) } - if err := p.ofClient.InstallInitNodePortClassifierFlows(p.nodePortIPMap, p.isIPv6); err != nil { - panic(err) - } } go p.serviceConfig.Run(stopCh) if p.endpointSliceEnabled { @@ -771,7 +763,7 @@ func NewProxier( ofClient openflow.Client, isIPv6 bool, routeClient route.Interface, - nodePortIPMap map[int][]net.IP, + nodePortAddresses []net.IP, proxyFullEnabled bool) *proxier { recorder := record.NewBroadcaster().NewRecorder( runtime.NewScheme(), @@ -801,7 +793,7 @@ func NewProxier( groupCounter: types.NewGroupCounter(isIPv6), ofClient: ofClient, routeClient: routeClient, - nodePortIPMap: nodePortIPMap, + nodePortAddresses: nodePortAddresses, isIPv6: isIPv6, proxyFullEnabled: proxyFullEnabled, endpointSliceEnabled: endpointSliceEnabled, @@ -855,15 +847,15 @@ func NewDualStackProxier( informerFactory informers.SharedInformerFactory, ofClient openflow.Client, routeClient route.Interface, - nodePortIPMap map[int][]net.IP, - nodePortIPv6Map map[int][]net.IP, + nodePortAddressesIPv4 []net.IP, + nodePortAddressesIPv6 []net.IP, proxyFullEnabled bool) *metaProxierWrapper { // Create an ipv4 instance of the single-stack proxier. - ipv4Proxier := NewProxier(hostname, informerFactory, ofClient, false, routeClient, nodePortIPMap, proxyFullEnabled) + ipv4Proxier := NewProxier(hostname, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyFullEnabled) // Create an ipv6 instance of the single-stack proxier. - ipv6Proxier := NewProxier(hostname, informerFactory, ofClient, true, routeClient, nodePortIPv6Map, proxyFullEnabled) + ipv6Proxier := NewProxier(hostname, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyFullEnabled) // Create a meta-proxier that dispatch calls between the two // single-stack proxier instances. diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index b10ab0efb29..8a19bba2db4 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -30,7 +30,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/component-base/metrics/testutil" - agentconfig "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow" ofmock "antrea.io/antrea/pkg/agent/openflow/testing" "antrea.io/antrea/pkg/agent/proxy/metrics" @@ -53,8 +52,8 @@ var ( svcNodePortIPv4 = net.ParseIP("192.168.77.100") svcNodePortIPv6 = net.ParseIP("2001::192:168:77:100") - nodePortIPv4Map = map[int][]net.IP{2: {svcNodePortIPv4}} - nodePortIPv6Map = map[int][]net.IP{2: {svcNodePortIPv6}} + nodePortAddressesIPv4 = []net.IP{svcNodePortIPv4} + nodePortAddressesIPv6 = []net.IP{svcNodePortIPv6} ) func makeNamespaceName(namespace, name string) apimachinerytypes.NamespacedName { @@ -100,7 +99,7 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*corev1.Endpoints)) return ept } -func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodePortIPMap map[int][]net.IP, isIPv6, proxyFullEnabled bool) *proxier { +func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodePortAddresses []net.IP, isIPv6, proxyFullEnabled bool) *proxier { hostname := "localhost" eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder( @@ -126,7 +125,7 @@ func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodeP routeClient: routeClient, serviceStringMap: map[string]k8sproxy.ServicePortName{}, isIPv6: isIPv6, - nodePortIPMap: nodePortIPMap, + nodePortAddresses: nodePortAddresses, proxyFullEnabled: proxyFullEnabled, } p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, time.Second, 30*time.Second, 2) @@ -179,18 +178,18 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { bindingProtocol = binding.ProtocolTCPv6 } mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false).Times(1) mockRouteClient.EXPECT().AddClusterIPRoute(svcIP, isIPv6).Times(1) fp.syncProxyRules() } -func testLoadBalancer(t *testing.T, nodePortIPMap map[int][]net.IP, svcIP, ep1IP, ep2IP, loadBalancerIP net.IP, isIPv6, nodeLocalExternal bool) { +func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP, loadBalancerIP net.IP, isIPv6, nodeLocalExternal bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) mockRouteClient := routemock.NewMockInterface(ctrl) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortIPMap, isIPv6, true) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, isIPv6, true) svcPort := 80 svcNodePort := 30008 @@ -261,29 +260,27 @@ func testLoadBalancer(t *testing.T, nodePortIPMap map[int][]net.IP, svcIP, ep1IP } mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), nodeLocalExternal).Times(1) if nodeLocalExternal { groupID, _ = fp.groupCounter.Get(svcPortName, true) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) } - mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), uint16(svcNodePort), bindingProtocol, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), nodeLocalExternal).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), uint16(svcNodePort), bindingProtocol, uint16(0), nodeLocalExternal).Times(1) mockOFClient.EXPECT().InstallLoadBalancerServiceFromOutsideFlows(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - mockOFClient.EXPECT().InstallServiceClassifierFlow(corev1.ServiceTypeLoadBalancer, loadBalancerIP, uint16(svcPort), bindingProtocol, nodeLocalExternal).Times(1) - mockOFClient.EXPECT().InstallServiceClassifierFlow(corev1.ServiceTypeNodePort, agentconfig.DummyNodePortSvcIP, uint16(svcNodePort), bindingProtocol, nodeLocalExternal).Times(1) mockRouteClient.EXPECT().AddClusterIPRoute(svcIP, isIPv6).Times(1) mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}, isIPv6).Times(1) - mockRouteClient.EXPECT().AddNodePort(nodePortIPMap, uint16(svcNodePort), bindingProtocol, isIPv6).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol, isIPv6).Times(1) fp.syncProxyRules() } -func testNodePort(t *testing.T, nodePortIPMap map[int][]net.IP, svcIP, ep1IP, ep2IP net.IP, isIPv6, nodeLocalExternal bool) { +func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP net.IP, isIPv6, nodeLocalExternal bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) mockRouteClient := routemock.NewMockInterface(ctrl) - fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortIPMap, isIPv6, true) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, isIPv6, true) svcPort := 80 svcNodePort := 31000 @@ -351,14 +348,13 @@ func testNodePort(t *testing.T, nodePortIPMap map[int][]net.IP, svcIP, ep1IP, ep } mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false).Times(1) if nodeLocalExternal { groupID, _ = fp.groupCounter.Get(svcPortName, true) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) } - mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), uint16(svcNodePort), bindingProtocol, uint16(0)).Times(1) - mockOFClient.EXPECT().InstallServiceClassifierFlow(corev1.ServiceTypeNodePort, agentconfig.DummyNodePortSvcIP, uint16(svcNodePort), bindingProtocol, nodeLocalExternal).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), uint16(svcNodePort), bindingProtocol, uint16(0), nodeLocalExternal).Times(1) mockRouteClient.EXPECT().AddClusterIPRoute(svcIP, isIPv6).Times(1) mockRouteClient.EXPECT().AddNodePort(gomock.Any(), uint16(svcNodePort), bindingProtocol, isIPv6).Times(1) @@ -366,35 +362,35 @@ func testNodePort(t *testing.T, nodePortIPMap map[int][]net.IP, svcIP, ep1IP, ep } func TestLoadBalancerIPv4(t *testing.T) { - testLoadBalancer(t, nodePortIPv4Map, svcIPv4, ep1IPv4, nil, loadBalancerIPv4, false, false) + testLoadBalancer(t, nodePortAddressesIPv4, svcIPv4, ep1IPv4, nil, loadBalancerIPv4, false, false) } func TestLoadBalancerIPv4ExternalLocal(t *testing.T) { - testLoadBalancer(t, nodePortIPv4Map, svcIPv4, ep1IPv4, ep2IPv4, loadBalancerIPv4, false, true) + testLoadBalancer(t, nodePortAddressesIPv4, svcIPv4, ep1IPv4, ep2IPv4, loadBalancerIPv4, false, true) } func TestLoadBalancerIPv6(t *testing.T) { - testLoadBalancer(t, nodePortIPv6Map, svcIPv6, ep1IPv6, nil, loadBalancerIPv6, true, false) + testLoadBalancer(t, nodePortAddressesIPv6, svcIPv6, ep1IPv6, nil, loadBalancerIPv6, true, false) } func TestLoadBalancerIPv6ExternalLocal(t *testing.T) { - testLoadBalancer(t, nodePortIPv6Map, svcIPv6, ep1IPv6, ep2IPv6, loadBalancerIPv6, true, true) + testLoadBalancer(t, nodePortAddressesIPv6, svcIPv6, ep1IPv6, ep2IPv6, loadBalancerIPv6, true, true) } func TestNodePortIPv4(t *testing.T) { - testNodePort(t, nodePortIPv4Map, svcIPv4, ep1IPv4, nil, false, false) + testNodePort(t, nodePortAddressesIPv4, svcIPv4, ep1IPv4, nil, false, false) } func TestNodePortIPv4ExternalLocal(t *testing.T) { - testNodePort(t, nodePortIPv4Map, svcIPv4, ep1IPv4, ep2IPv4, false, true) + testNodePort(t, nodePortAddressesIPv4, svcIPv4, ep1IPv4, ep2IPv4, false, true) } func TestNodePortIPv6(t *testing.T) { - testNodePort(t, nodePortIPv6Map, svcIPv6, ep1IPv6, nil, true, false) + testNodePort(t, nodePortAddressesIPv6, svcIPv6, ep1IPv6, nil, true, false) } func TestNodePortIPv6ExternalLocal(t *testing.T) { - testNodePort(t, nodePortIPv6Map, svcIPv6, ep1IPv6, ep2IPv6, true, true) + testNodePort(t, nodePortAddressesIPv6, svcIPv6, ep1IPv6, ep2IPv6, true, true) } func TestClusterIPv4(t *testing.T) { @@ -471,11 +467,11 @@ func TestDualStackService(t *testing.T) { mockOFClient.EXPECT().InstallServiceGroup(groupIDv4, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDv4, svcIPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDv4, svcIPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0), false).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupIDv6, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCPv6, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDv6, svcIPv6, uint16(svcPort), binding.ProtocolTCPv6, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDv6, svcIPv6, uint16(svcPort), binding.ProtocolTCPv6, uint16(0), false).Times(1) fpv4.syncProxyRules() fpv6.syncProxyRules() @@ -526,7 +522,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) groupID, _ := fp.groupCounter.Get(svcPortName, false) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false).Times(1) mockRouteClient.EXPECT().AddClusterIPRoute(svcIP, isIPv6).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) @@ -660,8 +656,8 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP mockOFClient.EXPECT().InstallServiceGroup(groupIDUDP, false, gomock.Any()).Times(2) mockOFClient.EXPECT().InstallEndpointFlows(protocolTCP, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(protocolUDP, gomock.Any()).Times(2) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), protocolTCP, uint16(0)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDUDP, svcIP, uint16(svcPort), protocolUDP, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), protocolTCP, uint16(0), false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDUDP, svcIP, uint16(svcPort), protocolUDP, uint16(0), false).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(protocolUDP, gomock.Any()).Times(1) fp.syncProxyRules() @@ -721,7 +717,7 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv groupID, _ := fp.groupCounter.Get(svcPortName, false) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(2) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(2) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) fp.syncProxyRules() @@ -793,7 +789,7 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne groupID, _ := fp.groupCounter.Get(svcPortName, false) mockOFClient.EXPECT().InstallServiceGroup(groupID, true, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(corev1.DefaultClientIPServiceAffinitySeconds)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(corev1.DefaultClientIPServiceAffinitySeconds), false).Times(1) fp.syncProxyRules() } @@ -900,9 +896,9 @@ func testPortChange(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { groupID, _ := fp.groupCounter.Get(svcPortName, false) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort1), bindingProtocol, uint16(0)) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort1), bindingProtocol, uint16(0), false) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort1), bindingProtocol) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort2), bindingProtocol, uint16(0)) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort2), bindingProtocol, uint16(0), false) fp.syncProxyRules() @@ -989,8 +985,8 @@ func TestServicesWithSameEndpoints(t *testing.T) { mockOFClient.EXPECT().InstallServiceGroup(groupID2, false, gomock.Any()).Times(1) bindingProtocol := binding.ProtocolTCP mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(2) - mockOFClient.EXPECT().InstallServiceFlows(groupID1, svcIP1, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID2, svcIP2, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID1, svcIP1, uint16(svcPort), bindingProtocol, uint16(0), false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID2, svcIP2, uint16(svcPort), bindingProtocol, uint16(0), false).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP1, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP2, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(groupID1).Times(1) diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index fcfb85c6755..71a75e1b1d8 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -56,10 +56,10 @@ type Interface interface { InitServiceProxyConfig(isIPv6 bool) error // AddNodePort adds configurations when a NodePort Service is created. - AddNodePort(nodePortIPMap map[int][]net.IP, port uint16, protocol binding.Protocol, isIPv6 bool) error + AddNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol, isIPv6 bool) error // DeleteNodePort deletes related configurations when a NodePort Service is deleted. - DeleteNodePort(nodePortIPMap map[int][]net.IP, port uint16, protocol binding.Protocol, isIPv6 bool) error + DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol, isIPv6 bool) error // AddClusterIPRoute adds route on K8s node for Service ClusterIP. AddClusterIPRoute(svcIP net.IP, isIPv6 bool) error diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index c80aa1cc147..2c82805449f 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -930,16 +930,14 @@ func (c *Client) InitServiceProxyConfig(isIPv6 bool) error { // AddNodePort is used to add IP,port:protocol entries to target ip set when a NodePort Service is added. An entry is added // for every NodePort IP. -func (c *Client) AddNodePort(nodePortIPMap map[int][]net.IP, port uint16, protocol binding.Protocol, isIPv6 bool) error { +func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol, isIPv6 bool) error { ipSetName := getNodePortIPSetName(isIPv6) transProtocol := getTransProtocolStr(protocol) - for _, addrs := range nodePortIPMap { - for i := range addrs { - ipSetEntry := fmt.Sprintf("%s,%s:%d", addrs[i], transProtocol, port) - if err := ipset.AddEntry(ipSetName, ipSetEntry); err != nil { - return err - } + for i := range nodePortAddresses { + ipSetEntry := fmt.Sprintf("%s,%s:%d", nodePortAddresses[i], transProtocol, port) + if err := ipset.AddEntry(ipSetName, ipSetEntry); err != nil { + return err } } @@ -947,16 +945,14 @@ func (c *Client) AddNodePort(nodePortIPMap map[int][]net.IP, port uint16, protoc } // DeleteNodePort is used to delete related IP set entries when a NodePort Service is deleted. -func (c *Client) DeleteNodePort(nodePortIPMap map[int][]net.IP, port uint16, protocol binding.Protocol, isIPv6 bool) error { +func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol, isIPv6 bool) error { ipSetName := getNodePortIPSetName(isIPv6) transProtocol := getTransProtocolStr(protocol) - for _, addrs := range nodePortIPMap { - for i := range addrs { - ipSetEntry := fmt.Sprintf("%s,%s:%d", addrs[i], transProtocol, port) - if err := ipset.DelEntry(ipSetName, ipSetEntry); err != nil { - return err - } + for i := range nodePortAddresses { + ipSetEntry := fmt.Sprintf("%s,%s:%d", nodePortAddresses[i], transProtocol, port) + if err := ipset.DelEntry(ipSetName, ipSetEntry); err != nil { + return err } } diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 4b9283eda43..2060743df56 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -253,11 +253,11 @@ func (c *Client) InitServiceProxyConfig(isIPv6 bool) error { return nil } -func (c *Client) AddNodePort(nodePortIPMap map[int][]net.IP, port uint16, protocol binding.Protocol, isIPv6 bool) error { +func (c *Client) AddNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol, isIPv6 bool) error { return nil } -func (c *Client) DeleteNodePort(nodePortIPMap map[int][]net.IP, port uint16, protocol binding.Protocol, isIPv6 bool) error { +func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protocol binding.Protocol, isIPv6 bool) error { return nil } diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index d23dc0bfd82..ed7ca859fe4 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -79,7 +79,7 @@ func (mr *MockInterfaceMockRecorder) AddLoadBalancer(arg0, arg1 interface{}) *go } // AddNodePort mocks base method -func (m *MockInterface) AddNodePort(arg0 map[int][]net.IP, arg1 uint16, arg2 openflow.Protocol, arg3 bool) error { +func (m *MockInterface) AddNodePort(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol, arg3 bool) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddNodePort", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) @@ -135,7 +135,7 @@ func (mr *MockInterfaceMockRecorder) DeleteLoadBalancer(arg0, arg1 interface{}) } // DeleteNodePort mocks base method -func (m *MockInterface) DeleteNodePort(arg0 map[int][]net.IP, arg1 uint16, arg2 openflow.Protocol, arg3 bool) error { +func (m *MockInterface) DeleteNodePort(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol, arg3 bool) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteNodePort", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) diff --git a/pkg/agent/util/net.go b/pkg/agent/util/net.go index 6a490261f96..7f2e02b4783 100644 --- a/pkg/agent/util/net.go +++ b/pkg/agent/util/net.go @@ -170,10 +170,9 @@ func GetIPWithFamily(ips []net.IP, addrFamily uint8) (net.IP, error) { } } -// GetAllNodeIPs gets all Node IP addresses (not including IPv6 link local address). -func GetAllNodeIPs() (map[int][]net.IP, map[int][]net.IP, error) { - nodeIPv4Map := make(map[int][]net.IP) - nodeIPv6Map := make(map[int][]net.IP) +// GetAllNodeAddresses gets all Node IP addresses (not including IPv6 link local address). +func GetAllNodeAddresses() ([]net.IP, []net.IP, error) { + var nodeAddressesIPv4, nodeAddressesIPv6 []net.IP _, ipv6LinkLocalNet, _ := net.ParseCIDR("fe80::/64") // Get all interfaces. @@ -183,7 +182,6 @@ func GetAllNodeIPs() (map[int][]net.IP, map[int][]net.IP, error) { } for _, itf := range interfaces { // Get all IPs of every interface - ifIndex := itf.Index addrs, err := itf.Addrs() if err != nil { return nil, nil, err @@ -195,13 +193,13 @@ func GetAllNodeIPs() (map[int][]net.IP, map[int][]net.IP, error) { } if ip.To4() != nil { - nodeIPv4Map[ifIndex] = append(nodeIPv4Map[ifIndex], ip) + nodeAddressesIPv4 = append(nodeAddressesIPv4, ip) } else { - nodeIPv6Map[ifIndex] = append(nodeIPv6Map[ifIndex], ip) + nodeAddressesIPv6 = append(nodeAddressesIPv6, ip) } } } - return nodeIPv4Map, nodeIPv6Map, nil + return nodeAddressesIPv4, nodeAddressesIPv6, nil } // ExtendCIDRWithIP is used for extending an IPNet with an IP. diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 4537328294f..11e6505c4ec 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -279,34 +279,21 @@ type LearnAction interface { DeleteLearned() LearnAction MatchEthernetProtocolIP(isIPv6 bool) LearnAction MatchTransportDst(protocol Protocol) LearnAction - MatchTransportDstAsSrc(protocol Protocol) LearnAction - MatchNetworkSrcAsDst(isIPv6 bool) LearnAction MatchLearnedTCPDstPort() LearnAction MatchLearnedUDPDstPort() LearnAction MatchLearnedSCTPDstPort() LearnAction MatchLearnedTCPv6DstPort() LearnAction MatchLearnedUDPv6DstPort() LearnAction MatchLearnedSCTPv6DstPort() LearnAction - MatchLearnedTCPDstPortAsSrcPort() LearnAction - MatchLearnedUDPDstPortAsSrcPort() LearnAction - MatchLearnedSCTPDstPortAsSrcPort() LearnAction - MatchLearnedTCPv6DstPortAsSrcPort() LearnAction - MatchLearnedUDPv6DstPortAsSrcPort() LearnAction - MatchLearnedSCTPv6DstPortAsSrcPort() LearnAction MatchLearnedSrcIP() LearnAction MatchLearnedDstIP() LearnAction MatchLearnedSrcIPv6() LearnAction MatchLearnedDstIPv6() LearnAction - MatchLearnedSrcIPAsDstIP() LearnAction - MatchLearnedDstIPAsSrcIP() LearnAction - MatchLearnedSrcIPv6AsDstIPv6() LearnAction - MatchLearnedDstIPv6AsSrcIPv6() LearnAction MatchReg(regID int, data uint32, rng Range) LearnAction LoadReg(regID int, data uint32, rng Range) LearnAction LoadRegToReg(fromRegID, toRegID int, fromRng, toRng Range) LearnAction LoadXXRegToXXReg(fromRegID, toRegID int, fromRng, toRng Range) LearnAction SetDstMAC(mac net.HardwareAddr) LearnAction - SetLearnedSrcMACAsDstMAC() LearnAction Done() FlowBuilder } diff --git a/pkg/ovs/openflow/ofctrl_action.go b/pkg/ovs/openflow/ofctrl_action.go index b9f65bb61c8..4308188db6b 100644 --- a/pkg/ovs/openflow/ofctrl_action.go +++ b/pkg/ovs/openflow/ofctrl_action.go @@ -409,47 +409,6 @@ func (a *ofLearnAction) MatchTransportDst(protocol Protocol) LearnAction { return a } -// MatchTransportDstAsSrc specifies that the transport layer destination field -// {tcp|udp|sctp}_src in the learned flow must match the {tcp|udp|sctp}_dst field -// of the packet currently being processed. It only accepts ProtocolTCP, ProtocolUDP, or -// ProtocolSCTP, otherwise this does nothing. -func (a *ofLearnAction) MatchTransportDstAsSrc(protocol Protocol) LearnAction { - var ipProtoValue int - isIPv6 := false - switch protocol { - case ProtocolTCP: - ipProtoValue = ofctrl.IP_PROTO_TCP - case ProtocolUDP: - ipProtoValue = ofctrl.IP_PROTO_UDP - case ProtocolSCTP: - ipProtoValue = ofctrl.IP_PROTO_SCTP - case ProtocolTCPv6: - ipProtoValue = ofctrl.IP_PROTO_TCP - isIPv6 = true - case ProtocolUDPv6: - ipProtoValue = ofctrl.IP_PROTO_UDP - isIPv6 = true - case ProtocolSCTPv6: - ipProtoValue = ofctrl.IP_PROTO_SCTP - isIPv6 = true - default: - // Return directly if the protocol is not acceptable. - return a - } - - a.MatchEthernetProtocolIP(isIPv6) - ipTypeVal := make([]byte, 2) - ipTypeVal[1] = byte(ipProtoValue) - a.nxLearn.AddMatch(&ofctrl.LearnField{Name: "NXM_OF_IP_PROTO"}, 1*8, nil, ipTypeVal) - // OXM_OF fields support TCP, UDP and SCTP, but NXM_OF fields only support TCP and UDP. So here using "OXM_OF_" to - // generate the field name. - trimProtocol := strings.ReplaceAll(string(protocol), "v6", "") - learnFieldName := fmt.Sprintf("OXM_OF_%s_SRC", strings.ToUpper(trimProtocol)) - fromFieldName := fmt.Sprintf("OXM_OF_%s_DST", strings.ToUpper(trimProtocol)) - a.nxLearn.AddMatch(&ofctrl.LearnField{Name: learnFieldName}, 2*8, &ofctrl.LearnField{Name: fromFieldName}, nil) - return a -} - // MatchLearnedTCPDstPort specifies that the tcp_dst field in the learned flow // must match the tcp_dst of the packet currently being processed. func (a *ofLearnAction) MatchLearnedTCPDstPort() LearnAction { @@ -486,42 +445,6 @@ func (a *ofLearnAction) MatchLearnedSCTPv6DstPort() LearnAction { return a.MatchTransportDst(ProtocolSCTPv6) } -// MatchLearnedTCPDstPortAsSrcPort specifies that the tcp_src field in the learned flow -// must match the tcp_dst of the packet currently being processed. -func (a *ofLearnAction) MatchLearnedTCPDstPortAsSrcPort() LearnAction { - return a.MatchTransportDstAsSrc(ProtocolTCP) -} - -// MatchLearnedTCPv6DstPortAsSrcPort specifies that the tcp_src field in the learned flow -// must match the tcp_dst of the packet currently being processed. -func (a *ofLearnAction) MatchLearnedTCPv6DstPortAsSrcPort() LearnAction { - return a.MatchTransportDstAsSrc(ProtocolTCPv6) -} - -// MatchLearnedUDPDstPortAsSrcPort specifies that the udp_src field in the learned flow -// must match the udp_dst of the packet currently being processed. -func (a *ofLearnAction) MatchLearnedUDPDstPortAsSrcPort() LearnAction { - return a.MatchTransportDstAsSrc(ProtocolUDP) -} - -// MatchLearnedUDPv6DstPortAsSrcPort specifies that the udp_src field in the learned flow -// must match the udp_dst of the packet currently being processed. -func (a *ofLearnAction) MatchLearnedUDPv6DstPortAsSrcPort() LearnAction { - return a.MatchTransportDstAsSrc(ProtocolUDPv6) -} - -// MatchLearnedSCTPDstPortAsSrcPort specifies that the sctp_src field in the learned flow -// must match the sctp_dst of the packet currently being processed. -func (a *ofLearnAction) MatchLearnedSCTPDstPortAsSrcPort() LearnAction { - return a.MatchTransportDstAsSrc(ProtocolSCTP) -} - -// MatchLearnedSCTPv6DstPortAsSrcPort specifies that the sctp_src field in the learned flow -// must match the sctp_dst of the packet currently being processed. -func (a *ofLearnAction) MatchLearnedSCTPv6DstPortAsSrcPort() LearnAction { - return a.MatchTransportDstAsSrc(ProtocolSCTPv6) -} - // MatchLearnedSrcIP makes the learned flow to match the nw_src of current IP packet. func (a *ofLearnAction) MatchLearnedSrcIP() LearnAction { a.nxLearn.AddMatch(&ofctrl.LearnField{Name: "NXM_OF_IP_SRC"}, 4*8, &ofctrl.LearnField{Name: "NXM_OF_IP_SRC"}, nil) @@ -546,45 +469,6 @@ func (a *ofLearnAction) MatchLearnedDstIPv6() LearnAction { return a } -// MatchLearnedSrcIPAsDstIP makes the learned flow to match the nw_dst of current IP packet's nw_src. -func (a *ofLearnAction) MatchLearnedSrcIPAsDstIP() LearnAction { - a.nxLearn.AddMatch(&ofctrl.LearnField{Name: "NXM_OF_IP_DST"}, 4*8, &ofctrl.LearnField{Name: "NXM_OF_IP_SRC"}, nil) - return a -} - -// MatchLearnedDstIPAsSrcIP makes the learned flow to match the nw_src of current IP packet's nw_dst. -func (a *ofLearnAction) MatchLearnedDstIPAsSrcIP() LearnAction { - a.nxLearn.AddMatch(&ofctrl.LearnField{Name: "NXM_OF_IP_SRC"}, 4*8, &ofctrl.LearnField{Name: "NXM_OF_IP_DST"}, nil) - return a -} - -// MatchLearnedSrcIPv6AsDstIPv6 makes the learned flow to match the ipv6_dst of current IPv6 packet's ipv6_src. -func (a *ofLearnAction) MatchLearnedSrcIPv6AsDstIPv6() LearnAction { - a.nxLearn.AddMatch(&ofctrl.LearnField{Name: "NXM_NX_IPV6_DST"}, 16*8, &ofctrl.LearnField{Name: "NXM_NX_IPV6_SRC"}, nil) - return a -} - -// MatchLearnedDstIPv6AsSrcIPv6 makes the learned flow to match the ipv6_src of current IPv6 packet's ipv6_dst. -func (a *ofLearnAction) MatchLearnedDstIPv6AsSrcIPv6() LearnAction { - a.nxLearn.AddMatch(&ofctrl.LearnField{Name: "NXM_NX_IPV6_SRC"}, 16*8, &ofctrl.LearnField{Name: "NXM_NX_IPV6_DST"}, nil) - return a -} - -// MatchNetworkSrcAsDst makes the learned flow to match the network nw_src/ipv6_src of current packet's nw_dst/ipv6_dst. -func (a *ofLearnAction) MatchNetworkSrcAsDst(isIPv6 bool) LearnAction { - learnBits := uint16(32) - from := "NXM_OF_IP_SRC" - to := "NXM_OF_IP_DST" - if isIPv6 { - learnBits = 128 - from = "NXM_NX_IPV6_SRC" - to = "NXM_NX_IPV6_DST" - } - a.MatchEthernetProtocolIP(isIPv6) - a.nxLearn.AddMatch(&ofctrl.LearnField{Name: to}, learnBits, &ofctrl.LearnField{Name: from}, nil) - return a -} - // MatchReg makes the learned flow to match the data in the reg of specific range. func (a *ofLearnAction) MatchReg(regID int, data uint32, rng Range) LearnAction { toField := &ofctrl.LearnField{Name: fmt.Sprintf("NXM_NX_REG%d", regID), Start: uint16(rng[0])} @@ -647,13 +531,6 @@ func (a *ofLearnAction) SetDstMAC(mac net.HardwareAddr) LearnAction { return a } -func (a *ofLearnAction) SetLearnedSrcMACAsDstMAC() LearnAction { - toField := &ofctrl.LearnField{Name: "NXM_OF_ETH_DST"} - fromField := &ofctrl.LearnField{Name: "NXM_OF_ETH_SRC"} - a.nxLearn.AddLoadAction(toField, 48, fromField, nil) - return a -} - func (a *ofLearnAction) Done() FlowBuilder { a.flowBuilder.ApplyAction(a.nxLearn) return a.flowBuilder diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index f3c04b3a284..ab12cfc6cb6 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -574,7 +574,7 @@ func installServiceFlows(t *testing.T, gid uint32, svc svcConfig, endpointList [ assert.NoError(t, err, "no error should return when installing flows for Endpoints") err = c.InstallServiceGroup(groupID, svc.withSessionAffinity, endpointList) assert.NoError(t, err, "no error should return when installing groups for Service") - err = c.InstallServiceFlows(groupID, svc.ip, svc.port, svc.protocol, stickyMaxAgeSeconds) + err = c.InstallServiceFlows(groupID, svc.ip, svc.port, svc.protocol, stickyMaxAgeSeconds, false) assert.NoError(t, err, "no error should return when installing flows for Service") }