Skip to content

Commit

Permalink
For CR
Browse files Browse the repository at this point in the history
Signed-off-by: Hongliang Liu <[email protected]>
  • Loading branch information
hongliangl committed Aug 17, 2021
1 parent 49d7e53 commit 8d272a7
Show file tree
Hide file tree
Showing 16 changed files with 145 additions and 545 deletions.
14 changes: 7 additions & 7 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,27 +174,27 @@ func run(o *Options) error {
v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode)
proxyFull := o.config.AntreaProxyFull
var nodePortIPv4Map, nodePortIPv6Map map[int][]net.IP
var nodePortAddressesIPv4, nodePortAddressesIPv6 []net.IP
if proxyFull {
nodePortIPv4Map, nodePortIPv6Map, err = getAvailableNodePortIPs(o.config.NodePortAddresses, o.config.HostGateway)
nodePortAddressesIPv4, nodePortAddressesIPv6, err = getAvailableNodePortAddresses(o.config.NodePortAddresses)
if err != nil {
return fmt.Errorf("getting available NodePort IP addresses failed: %v", err)
}
if v4Enabled && len(nodePortIPv4Map) == 0 {
if v4Enabled && len(nodePortAddressesIPv4) == 0 {
return fmt.Errorf("no qualified NodePort IPv4 addresses was found")
}
if v6Enabled && len(nodePortIPv6Map) == 0 {
if v6Enabled && len(nodePortAddressesIPv6) == 0 {
return fmt.Errorf("no qualified NodePort IPv6 addresses was found")
}
}

switch {
case v4Enabled && v6Enabled:
proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortIPv4Map, nodePortIPv6Map, proxyFull)
proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyFull)
case v4Enabled:
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortIPv4Map, proxyFull)
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyFull)
case v6Enabled:
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortIPv6Map, proxyFull)
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyFull)
default:
return fmt.Errorf("at least one of IPv4 or IPv6 should be enabled")
}
Expand Down
35 changes: 13 additions & 22 deletions cmd/antrea-agent/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,36 @@ import (
"antrea.io/antrea/pkg/agent/util"
)

func getAvailableNodePortIPs(nodePortIPsFromConfig []string, gateway string) (map[int][]net.IP, map[int][]net.IP, error) {
func getAvailableNodePortAddresses(nodePortAddressesFromConfig []string) ([]net.IP, []net.IP, error) {
// Get all IP addresses of Node
nodeIPv4Map, nodeIPv6Map, err := util.GetAllNodeIPs()
nodeAddressesIPv4, nodeAddressesIPv6, err := util.GetAllNodeAddresses()
if err != nil {
return nil, nil, err
}
// IP address of Antrea gateway should not be NodePort IP as it cannot be accessed from outside the Cluster.
gatewayIfIndex := util.GetIndexByName(gateway)
delete(nodeIPv4Map, gatewayIfIndex)
delete(nodeIPv6Map, gatewayIfIndex)

// If option `NodePortAddresses` is not set, then all Node IP addresses will be used as NodePort IP address.
if len(nodePortIPsFromConfig) == 0 {
return nodeIPv4Map, nodeIPv6Map, nil
if len(nodePortAddressesFromConfig) == 0 {
return nodeAddressesIPv4, nodeAddressesIPv6, nil
}

var nodePortIPNets []*net.IPNet
for _, nodePortIP := range nodePortIPsFromConfig {
for _, nodePortIP := range nodePortAddressesFromConfig {
_, ipNet, _ := net.ParseCIDR(nodePortIP)
nodePortIPNets = append(nodePortIPNets, ipNet)
}

nodePortIPv4Map, nodePortIPv6Map := make(map[int][]net.IP), make(map[int][]net.IP)
var nodePortAddressesIPv4, nodePortAddressesIPv6 []net.IP
for _, nodePortIPNet := range nodePortIPNets {
for index, ips := range nodeIPv4Map {
for i := range ips {
if nodePortIPNet.Contains(ips[i]) {
nodePortIPv4Map[index] = append(nodePortIPv4Map[index], ips[i])
}
for i := range nodeAddressesIPv4 {
if nodePortIPNet.Contains(nodeAddressesIPv4[i]) {
nodePortAddressesIPv4 = append(nodePortAddressesIPv4, nodeAddressesIPv4[i])
}
}
for index, ips := range nodeIPv6Map {
for i := range ips {
if nodePortIPNet.Contains(ips[i]) {
nodePortIPv6Map[index] = append(nodePortIPv6Map[index], ips[i])
}
for i := range nodeAddressesIPv6 {
if nodePortIPNet.Contains(nodeAddressesIPv6[i]) {
nodePortAddressesIPv6 = append(nodePortAddressesIPv6, nodeAddressesIPv6[i])
}
}
}

return nodePortIPv4Map, nodePortIPv6Map, nil
return nodePortAddressesIPv4, nodePortAddressesIPv6, nil
}
2 changes: 0 additions & 2 deletions pkg/agent/config/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ var (
// host Service routing entry.
ServiceGWHairpinIPv4 = net.ParseIP("169.254.169.253")
ServiceGWHairpinIPv6 = net.ParseIP("fc01::aabb:ccdd:eeff")

DummyNodePortSvcIP = net.ParseIP("0.0.0.0")
)

type GatewayConfig struct {
Expand Down
53 changes: 5 additions & 48 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"net"

"antrea.io/libOpenflow/protocol"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/config"
Expand Down Expand Up @@ -109,7 +108,7 @@ type Client interface {
// action to maintain the LB decision.
// The group with the groupID must be installed before, otherwise the
// installation will fail.
InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error
InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error
// UninstallServiceFlows removes flows installed by InstallServiceFlows.
UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error
// InstallLoadBalancerServiceFromOutsideFlows installs flows for LoadBalancer Service traffic from outside node.
Expand All @@ -120,19 +119,6 @@ type Client interface {
// UninstallLoadBalancerServiceFromOutsideFlows removes flows installed by InstallLoadBalancerServiceFromOutsideFlows.
UninstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error

// InstallInitNodePortClassifierFlows installs the first clause flow of conjunction which is used to classify the first packet of
// Service NodePort, with every NodePort IP address as destination IP address.
InstallInitNodePortClassifierFlows(nodePortIPMap map[int][]net.IP, isIPv6 bool) error

// InstallServiceClassifierFlow installs flows to classify the first packet of Service. For NodePort/LoadBalancer
// whose externalTrafficPolicy is Cluster, or NodePort/LoadBalancer whose externalTrafficPolicy is Local and client
// is from localhost, the flow will set a register to indicate that the packet requires SNAT. The flow will also
// generate a learned flow to rewrite the destination MAC of response packet whose request packet is from remote
// client.
InstallServiceClassifierFlow(svcType v1.ServiceType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, nodeLocalExternal bool) error
// UninstallServiceClassifierFlow removes flows installed by InstallServiceClassifierFlow.
UninstallServiceClassifierFlow(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error

// GetFlowTableStatus should return an array of flow table status, all existing flow tables should be included in the list.
GetFlowTableStatus() []binding.TableStatus

Expand Down Expand Up @@ -537,10 +523,6 @@ func generateServicePortFlowCacheKey(svcIP net.IP, svcPort uint16, protocol bind
return fmt.Sprintf("S%s%s%x", svcIP, protocol, svcPort)
}

func generateServiceClassifierFlowCacheKey(svcIP net.IP, svcPort uint16, protocol binding.Protocol) string {
return fmt.Sprintf("S%s%s%x/C", svcIP, protocol, svcPort)
}

func (c *client) InstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
Expand Down Expand Up @@ -585,38 +567,13 @@ func (c *client) UninstallEndpointFlows(protocol binding.Protocol, endpoint prox
return c.deleteFlows(c.serviceFlowCache, cacheKey)
}

func (c *client) InstallInitNodePortClassifierFlows(nodePortIPMap map[int][]net.IP, isIPv6 bool) error {
flows := c.initServiceClassifierFlows(nodePortIPMap, isIPv6)
if err := c.ofEntryOperations.AddAll(flows); err != nil {
return err
}
c.defaultServiceFlows = append(c.defaultServiceFlows, flows...)
return nil
}

func (c *client) InstallServiceClassifierFlow(svcType v1.ServiceType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, nodeLocalExternal bool) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

flows := c.serviceClassifierFlow(svcType, svcIP, svcPort, protocol, nodeLocalExternal)
cacheKey := generateServiceClassifierFlowCacheKey(svcIP, svcPort, protocol)
return c.addFlows(c.serviceFlowCache, cacheKey, flows)
}

func (c *client) UninstallServiceClassifierFlow(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
cacheKey := generateServiceClassifierFlowCacheKey(svcIP, svcPort, protocol)
return c.deleteFlows(c.serviceFlowCache, cacheKey)
}

func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error {
func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
var flows []binding.Flow
flows = append(flows, c.serviceLBFlows(groupID, svcIP, svcPort, protocol, affinityTimeout != 0)...)
flows = append(flows, c.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0, nodeLocalExternal))
if affinityTimeout != 0 {
flows = append(flows, c.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout))
flows = append(flows, c.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout, nodeLocalExternal))
}
cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol)
return c.addFlows(c.serviceFlowCache, cacheKey, flows)
Expand Down Expand Up @@ -666,7 +623,7 @@ func (c *client) InstallDefaultServiceFlows() error {
if err := c.ofEntryOperations.AddAll(flows); err != nil {
return err
}
c.defaultServiceFlows = append(c.defaultServiceFlows, flows...)
c.defaultServiceFlows = flows
return nil
}

Expand Down
Loading

0 comments on commit 8d272a7

Please sign in to comment.