Skip to content

Commit

Permalink
Add excepts for egress to avoid SNAT (antrea-io#2707)
Browse files Browse the repository at this point in the history
Avoid SNAT for assigned IP block if the pod wants to communicate
directly, this can improve network performance

Signed-off-by: Yang Li [email protected]
  • Loading branch information
leonstack committed Sep 9, 2021
1 parent 589e1f7 commit 7247559
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 69 deletions.
8 changes: 8 additions & 0 deletions build/yamls/base/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ spec:
oneOf:
- format: ipv4
- format: ipv6
excepts:
items:
properties:
cidr:
type: string
format: cidr
type: object
type: array
externalIPPool:
type: string
status:
Expand Down
27 changes: 23 additions & 4 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net"
"reflect"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -81,6 +82,8 @@ type egressState struct {
egressIP string
// The actual datapath mark of this Egress. Used to check if the mark changes since last process.
mark uint32
// The except Info for bypass SNAT flows
excepts []crdv1a2.Except
// The actual openflow ports for which we have installed SNAT rules. Used to identify stale openflow ports when
// updating or deleting an Egress.
ofPorts sets.Int32
Expand Down Expand Up @@ -464,11 +467,12 @@ func (c *EgressController) deleteEgressState(egressName string) {
delete(c.egressStates, egressName)
}

func (c *EgressController) newEgressState(egressName string, egressIP string) *egressState {
func (c *EgressController) newEgressState(egressName string, egressIP string, excepts []crdv1a2.Except) *egressState {
c.egressStatesMutex.Lock()
defer c.egressStatesMutex.Unlock()
state := &egressState{
egressIP: egressIP,
excepts: excepts,
ofPorts: sets.NewInt32(),
pods: sets.NewString(),
}
Expand Down Expand Up @@ -592,7 +596,7 @@ func (c *EgressController) syncEgress(egressName string) error {
return nil
}
if !exist {
eState = c.newEgressState(egressName, egress.Spec.EgressIP)
eState = c.newEgressState(egressName, egress.Spec.EgressIP, egress.Spec.Excepts)
}

localNodeSelected, err := c.cluster.ShouldSelectEgress(egress)
Expand All @@ -619,12 +623,13 @@ func (c *EgressController) syncEgress(egressName string) error {

// If the mark changes, uninstall all of the Egress's Pod flows first, then installs them with new mark.
// It could happen when the Egress IP is added to or removed from the Node.
if eState.mark != mark {
if eState.mark != mark || !Equal(eState.excepts, egress.Spec.Excepts) {
// Uninstall all of its Pod flows.
if err := c.uninstallPodFlows(egressName, eState, eState.ofPorts, eState.pods); err != nil {
return err
}
eState.mark = mark
eState.excepts = egress.Spec.Excepts
}

if err := c.updateEgressStatus(egress, c.localIPDetector.IsLocalIP(egress.Spec.EgressIP)); err != nil {
Expand Down Expand Up @@ -671,7 +676,7 @@ func (c *EgressController) syncEgress(egressName string) error {
staleOFPorts.Delete(ofPort)
continue
}
if err := c.ofClient.InstallPodSNATFlows(uint32(ofPort), egressIP, mark); err != nil {
if err := c.ofClient.InstallPodSNATFlows(uint32(ofPort), egressIP, mark, eState.excepts); err != nil {
return err
}
eState.ofPorts.Insert(ofPort)
Expand Down Expand Up @@ -872,3 +877,17 @@ func (c *EgressController) deleteEgressGroup(group *cpv1b2.EgressGroup) {
delete(c.egressGroups, group.Name)
c.queue.Add(group.Name)
}

func Equal(a, b []crdv1a2.Except) bool {
if len(a) != len(b) {
return false
}
sort.Slice(a, func(i, j int) bool {return a[i].CIDR < a[j].CIDR})
sort.Slice(b, func(i, j int) bool {return b[i].CIDR < b[j].CIDR})
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}
113 changes: 61 additions & 52 deletions pkg/agent/controller/egress/egress_controller_test.go

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"antrea.io/antrea/pkg/agent/openflow/cookie"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
binding "antrea.io/antrea/pkg/ovs/openflow"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/third_party/proxy"
Expand Down Expand Up @@ -176,7 +177,7 @@ type Client interface {
// tunnel destination, and the packets should be SNAT'd on the remote
// Node. As of now, a Pod can be configured to use only a single SNAT
// IP in a single address family (IPv4 or IPv6).
InstallPodSNATFlows(ofPort uint32, snatIP net.IP, snatMark uint32) error
InstallPodSNATFlows(ofPort uint32, snatIP net.IP, snatMark uint32, excepts []crdv1a2.Except) error

// UninstallPodSNATFlows removes the SNAT flows for the local Pod.
UninstallPodSNATFlows(ofPort uint32) error
Expand Down Expand Up @@ -806,8 +807,8 @@ func (c *client) UninstallSNATMarkFlows(mark uint32) error {
return c.deleteFlows(c.snatFlowCache, cacheKey)
}

func (c *client) InstallPodSNATFlows(ofPort uint32, snatIP net.IP, snatMark uint32) error {
flows := []binding.Flow{c.snatRuleFlow(ofPort, snatIP, snatMark, c.nodeConfig.GatewayConfig.MAC)}
func (c *client) InstallPodSNATFlows(ofPort uint32, snatIP net.IP, snatMark uint32, excepts []crdv1a2.Except) error {
flows := c.snatRuleFlows(ofPort, snatIP, snatMark, c.nodeConfig.GatewayConfig.MAC, excepts)
cacheKey := fmt.Sprintf("p%x", ofPort)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
Expand Down
27 changes: 21 additions & 6 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow/cookie"
"antrea.io/antrea/pkg/agent/types"
crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
binding "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/ovs/ovsctl"
Expand Down Expand Up @@ -1927,26 +1928,39 @@ func (c *client) snatIPFromTunnelFlow(snatIP net.IP, mark uint32) binding.Flow {
Done()
}

// snatRuleFlow generates a flow that applies the SNAT rule for a local Pod. If
// snatRuleFlows generates flows that applies the SNAT rule for a local Pod. If
// the SNAT IP exists on the local Node, it sets the packet mark with the ID of
// the SNAT IP, for the traffic from the ofPort to external; if the SNAT IP is
// on a remote Node, it tunnels the packets to the SNAT IP.
func (c *client) snatRuleFlow(ofPort uint32, snatIP net.IP, snatMark uint32, localGatewayMAC net.HardwareAddr) binding.Flow {
func (c *client) snatRuleFlows(ofPort uint32, snatIP net.IP, snatMark uint32, localGatewayMAC net.HardwareAddr, excepts []crdv1a2.Except) []binding.Flow {
ipProto := getIPProtocol(snatIP)
snatTable := c.pipeline[snatTable]
l3FwdTable := c.pipeline[l3ForwardingTable]
nextTable := l3FwdTable.GetNext()
snatFlows := []binding.Flow{}
if snatMark != 0 {
// Local SNAT IP.
return snatTable.BuildFlow(priorityNormal).
return append(snatFlows, snatTable.BuildFlow(priorityNormal).
MatchProtocol(ipProto).
MatchCTStateNew(true).MatchCTStateTrk(true).
MatchInPort(ofPort).
Action().LoadPktMarkRange(snatMark, snatPktMarkRange).
Action().GotoTable(snatTable.GetNext()).
Cookie(c.cookieAllocator.Request(cookie.SNAT).Raw()).
Done()
Done())
}
for _, except := range excepts {
_, excidr, _ := net.ParseCIDR(except.CIDR)
snatFlows = append(snatFlows, l3FwdTable.BuildFlow(priorityNormal).
MatchInPort(ofPort).
MatchProtocol(ipProto).
MatchDstIPNet(*excidr).
Action().GotoTable(nextTable).
Cookie(c.cookieAllocator.Request(cookie.SNAT).Raw()).
Done())
}
// SNAT IP should be on a remote Node.
return snatTable.BuildFlow(priorityNormal).
snatFlows = append(snatFlows, snatTable.BuildFlow(priorityNormal).
MatchProtocol(ipProto).
MatchInPort(ofPort).
Action().SetSrcMAC(localGatewayMAC).
Expand All @@ -1955,7 +1969,8 @@ func (c *client) snatRuleFlow(ofPort uint32, snatIP net.IP, snatMark uint32, loc
Action().SetTunnelDst(snatIP).
Action().GotoTable(l3DecTTLTable).
Cookie(c.cookieAllocator.Request(cookie.SNAT).Raw()).
Done()
Done())
return snatFlows
}

// loadBalancerServiceFromOutsideFlow generates the flow to forward LoadBalancer service traffic from outside node
Expand Down
9 changes: 5 additions & 4 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/apis/crd/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ type EgressStatus struct {
EgressNode string `json:"egressNode"`
}

type Except struct {
CIDR string `json:"cidr"`
}

// EgressSpec defines the desired state for Egress.
type EgressSpec struct {
// AppliedTo selects Pods to which the Egress will be applied.
Expand All @@ -226,6 +230,7 @@ type EgressSpec struct {
// If it is non-empty, the EgressIP will be assigned to a Node specified by the pool automatically and will failover
// to a different Node when the Node becomes unreachable.
ExternalIPPool string `json:"externalIPPool"`
Excepts []Except `json:"excepts"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down

0 comments on commit 7247559

Please sign in to comment.