Skip to content

Commit

Permalink
rebase and add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Qiu <[email protected]>
  • Loading branch information
wenqiq committed Apr 27, 2022
1 parent 6979aa2 commit 22b4aa2
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 118 deletions.
178 changes: 124 additions & 54 deletions pkg/agent/controller/trafficcontrol/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package trafficcontrol

import (
"fmt"
"strconv"
"strings"
"sync"
"time"

uuid "github.com/satori/go.uuid"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -41,15 +43,26 @@ import (
)

const (
controllerName = "TrafficControlController"
defaultWorkers = 4
resyncPeriod = 0
controllerName = "TrafficControlController"
defaultWorkers = 4
resyncPeriod = 0
defaultVXLANTunnelDestinationPort = 4789
defaultGENEVETunnelDestinationPort = 6081
portNamePrefixVXLAN = "vxlan"
portNamePrefixGENEVE = "geneve"
portNamePrefixGRE = "gre"
portNamePrefixERSPAN = "erspan"
)

var (
// 1133a2c3-a42c-4c36-9396-5f17ec9541c5 was generated using uuid.NewV4() function.
uuidNamespace = uuid.FromStringOrNil("1133a2c3-a42c-4c36-9396-5f17ec9541c5")
)

type tcState struct {
ofPorts sets.Int32
pods sets.String
dstOfInPort uint32
targetPort uint32
dstOfOutPort uint32
}

Expand Down Expand Up @@ -241,50 +254,90 @@ func (c *Controller) filterPods(appliedTo *v1alpha2.AppliedTo) ([]*v1.Pod, error
return c.podLister.List(podSelector)
}

func (c *Controller) getDevicePort(device *v1alpha2.OVSPort) (ofPort uint32, err error) {
// TODO:Not tested yet
if device == nil || device.Name == "" {
return 0, fmt.Errorf("device is invalid")
func genName(name string) string {
return uuid.NewV5(uuidNamespace, name).String()
}

func genPortNameUDPTunnel(tunnel *v1alpha2.UDPTunnel) string {
names := []string{tunnel.RemoteIP}
if tunnel.DestinationPort != nil {
names = append(names, strconv.Itoa(int(*tunnel.DestinationPort)))
}
if tunnel.VNI != nil {
names = append(names, strconv.Itoa(int(*tunnel.VNI)))
}
normalizedName := strings.Join(names, " And ")
return genName(normalizedName)
}

func genPortNameGRETunnel(tunnel *v1alpha2.GRETunnel) string {
names := []string{tunnel.RemoteIP}
if tunnel.Key != nil {
names = append(names, strconv.Itoa(int(*tunnel.Key)))
}
normalizedName := strings.Join(names, " And ")
return genName(normalizedName)
}

func genPortNameERSPANTunnel(tunnel *v1alpha2.ERSPANTunnel) string {
names := []string{tunnel.RemoteIP, strconv.Itoa(int(tunnel.Version))}
if tunnel.SessionID != nil {
names = append(names, strconv.Itoa(int(*tunnel.SessionID)))
}
if tunnel.Index != nil {
names = append(names, strconv.Itoa(int(*tunnel.Index)))
}
itf, ok := c.interfaceStore.GetInterfaceByName(device.Name)
if ok {
return uint32(itf.OFPort), nil
if tunnel.Dir != nil {
names = append(names, strconv.Itoa(int(*tunnel.Dir)))
}
if ofPort, err := c.ovsBridgeClient.GetOFPort(device.Name, false); err == nil {
return uint32(ofPort), nil
if tunnel.HardwareID != nil {
names = append(names, strconv.Itoa(int(*tunnel.HardwareID)))
}
normalizedName := strings.Join(names, " And ")
return genName(normalizedName)
}

func (c *Controller) getDevicePort(device *v1alpha2.TrafficControlPort) (ofPort uint32, err error) {
// TODO:Not tested yet
if device == nil {
return 0, fmt.Errorf("device is invalid")
}
var ofPortTmp int32
createTunnelPortExt := func(tunnelType ovsconfig.TunnelType) error {
tunnelConfig := device.TunnelConfig
remoteIP := tunnelConfig.RemoteIP
createUDPTunnel := func(tunnelType ovsconfig.TunnelType, portName, remoteIP string, dstPort int32) error {
externalIDs := map[string]interface{}{
interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaTunnel,
}
if dstPort != 0 {
externalIDs["dst_port"] = dstPort
}
c.ovsPortUpdateMutex.Lock()
defer c.ovsPortUpdateMutex.Unlock()
portUUID, err := c.ovsBridgeClient.CreateTunnelPortExt(
device.Name, tunnelType, 0, false, "", remoteIP, "", externalIDs)
portName, tunnelType, 0, false, "", remoteIP, "", externalIDs)
if err != nil {
return err
}
ofPortTmp, err = c.ovsBridgeClient.GetOFPort(device.Name, false)
ofPortTmp, err = c.ovsBridgeClient.GetOFPort(portName, false)
if err != nil {
return err
}
itf := interfacestore.NewTunnelInterface(device.Name, tunnelType, nil, false)
itf := interfacestore.NewTunnelInterface(portName, tunnelType, nil, false)
itf.OVSPortConfig = &interfacestore.OVSPortConfig{PortUUID: portUUID, OFPort: ofPortTmp}
c.interfaceStore.AddInterface(itf)
ofPort = uint32(ofPortTmp)
return nil
}
switch device.Type {
case v1alpha2.OVSPortTypeInternal:
switch {
case device.OVSInternal != nil:
if ofPort, err := c.ovsBridgeClient.GetOFPort(device.OVSInternal.Name, false); err == nil {
return uint32(ofPort), nil
}
externalIDs := map[string]interface{}{
interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaUnset,
}
c.ovsPortUpdateMutex.Lock()
defer c.ovsPortUpdateMutex.Unlock()
portUUID, err := c.ovsBridgeClient.CreateInternalPort(device.Name, 0, externalIDs)
portUUID, err := c.ovsBridgeClient.CreateInternalPort(device.OVSInternal.Name, 0, externalIDs)
if err != nil {
return 0, err
}
Expand All @@ -293,22 +346,26 @@ func (c *Controller) getDevicePort(device *v1alpha2.OVSPort) (ofPort uint32, err
_ = c.ovsBridgeClient.DeletePort(portUUID)
}
}()
ofPortTmp, err = c.ovsBridgeClient.GetOFPort(device.Name, false)
ofPortTmp, err = c.ovsBridgeClient.GetOFPort(device.OVSInternal.Name, false)
if err != nil {
return 0, err
}
intf := interfacestore.NewGatewayInterface(device.Name)
intf := interfacestore.NewGatewayInterface(device.OVSInternal.Name)
intf.OVSPortConfig = &interfacestore.OVSPortConfig{PortUUID: portUUID, OFPort: ofPortTmp}
c.interfaceStore.AddInterface(intf)
ofPort = uint32(ofPortTmp)
klog.V(2).InfoS("Create internal port", "portUUID", portUUID)
case v1alpha2.OVSPortTypeDevice:
case device.Device != nil:
itf, ok := c.interfaceStore.GetInterfaceByName(device.Device.Name)
if ok {
return uint32(itf.OFPort), nil
}
externalIDs := map[string]interface{}{
interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaUnset,
}
c.ovsPortUpdateMutex.Lock()
defer c.ovsPortUpdateMutex.Unlock()
portUUID, err := c.ovsBridgeClient.CreatePort(device.Name, device.Name, externalIDs)
portUUID, err := c.ovsBridgeClient.CreatePort(device.Device.Name, device.Device.Name, externalIDs)
if err != nil {
return 0, err
}
Expand All @@ -317,31 +374,42 @@ func (c *Controller) getDevicePort(device *v1alpha2.OVSPort) (ofPort uint32, err
_ = c.ovsBridgeClient.DeletePort(portUUID)
}
}()
ofPortTmp, err = c.ovsBridgeClient.GetOFPort(device.Name, false)
ofPortTmp, err = c.ovsBridgeClient.GetOFPort(device.Device.Name, false)
if err != nil {
return 0, err
}
itf := interfacestore.NewUplinkInterface(device.Name)
itf.OVSPortConfig = &interfacestore.OVSPortConfig{PortUUID: portUUID, OFPort: ofPortTmp}
c.interfaceStore.AddInterface(itf)
newItf := interfacestore.NewUplinkInterface(device.Device.Name)
newItf.OVSPortConfig = &interfacestore.OVSPortConfig{PortUUID: portUUID, OFPort: ofPortTmp}
c.interfaceStore.AddInterface(newItf)
ofPort = uint32(ofPortTmp)
klog.V(2).InfoS("Create device", "portUUID", portUUID)
case v1alpha2.OVSPortTypeVXLAN:
if err := createTunnelPortExt(ovsconfig.VXLANTunnel); err != nil {
case device.VXLAN != nil:
dstPort := int32(defaultVXLANTunnelDestinationPort)
if device.VXLAN.DestinationPort != nil {
dstPort = *device.VXLAN.DestinationPort
}
portName := strings.Join([]string{portNamePrefixVXLAN, genPortNameUDPTunnel(device.VXLAN)}, "-")
if err := createUDPTunnel(ovsconfig.VXLANTunnel, portName, device.VXLAN.RemoteIP, dstPort); err != nil {
return 0, err
}
klog.V(2).InfoS("Create VXLANTunnel port", "portName", device.Name)
case v1alpha2.OVSPortTypeGENEVE:
if err := createTunnelPortExt(ovsconfig.GeneveTunnel); err != nil {
klog.V(2).InfoS("Create VXLANTunnel port", "config", device.VXLAN)
case device.GENEVE != nil:
dstPort := int32(defaultGENEVETunnelDestinationPort)
if device.VXLAN.DestinationPort != nil {
dstPort = *device.VXLAN.DestinationPort
}
portName := strings.Join([]string{portNamePrefixGENEVE, genPortNameUDPTunnel(device.GENEVE)}, "-")
if err := createUDPTunnel(ovsconfig.GeneveTunnel, portName, device.GENEVE.RemoteIP, dstPort); err != nil {
return 0, err
}
klog.V(2).InfoS("Create Geneve Tunnel port", "portName", device.Name)
case v1alpha2.OVSPortTypeGRE:
if err := createTunnelPortExt(ovsconfig.GRETunnel); err != nil {
klog.V(2).InfoS("Create Geneve Tunnel port", "config", device.GENEVE)
case device.GRE != nil:
portName := strings.Join([]string{portNamePrefixGRE, genPortNameGRETunnel(device.GRE)}, "-")
if err := createUDPTunnel(ovsconfig.GRETunnel, portName, device.GRE.RemoteIP, 0); err != nil {
return 0, err
}
klog.V(2).InfoS("Create GRETunnel port", "portName", device.Name)
case v1alpha2.OVSPortTypeERSPAN:
klog.V(2).InfoS("Create GRETunnel port", "config", device.GRE)
case device.ERSPAN != nil:
// ERSPAN version I and version II over IPv4 GRE and IPv6 GRE tunnel are supported. See ovs-fields(7) for matching and setting ERSPAN fields.
// $ ovs-vsctl add-br br0
// $ #For ERSPAN type 2 (version I)
Expand All @@ -356,14 +424,16 @@ func (c *Controller) getDevicePort(device *v1alpha2.OVSPort) (ofPort uint32, err
// options:remote_ip=172.31.1.1 \
// options:erspan_ver=2 options:erspan_dir=1 \
// options:erspan_hwid=4
config := device.ERSPANConfig
tunnelID := config.TunnelID

config := device.ERSPAN
remoteIP := config.RemoteIP
version := config.Version
index := config.Index
dir := config.Dir
hardwareID := config.HardwareID

portName := strings.Join([]string{portNamePrefixERSPAN, genPortNameERSPANTunnel(device.ERSPAN)}, "-")

externalIDs := map[string]interface{}{
interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaUnset,
"erspan_ver": version,
Expand All @@ -379,21 +449,21 @@ func (c *Controller) getDevicePort(device *v1alpha2.OVSPort) (ofPort uint32, err
}
c.ovsPortUpdateMutex.Lock()
defer c.ovsPortUpdateMutex.Unlock()
portUUID, err := c.ovsBridgeClient.CreateTunnelPortExt(device.Name,
portUUID, err := c.ovsBridgeClient.CreateTunnelPortExt(portName,
ovsconfig.ERSpanTunnel, 0, false, "", remoteIP, "", externalIDs)
if err != nil {
return 0, err
}
ofPortTmp, err = c.ovsBridgeClient.GetOFPort(device.Name, false)
ofPortTmp, err = c.ovsBridgeClient.GetOFPort(portName, false)
if err != nil {
return 0, err
}
itf := interfacestore.NewTunnelInterface(device.Name, ovsconfig.ERSpanTunnel, nil, false)
itf := interfacestore.NewTunnelInterface(portName, ovsconfig.ERSpanTunnel, nil, false)
itf.OVSPortConfig = &interfacestore.OVSPortConfig{PortUUID: portUUID, OFPort: ofPortTmp}
c.interfaceStore.AddInterface(itf)
ofPort = uint32(ofPortTmp)

klog.V(2).InfoS("Create ERSPAN port", "tunnelID", tunnelID, "remoteIP", remoteIP, "version", version, "index", index,
klog.V(2).InfoS("Create ERSPAN port", "remoteIP", remoteIP, "version", version, "index", index,
"dir", dir, "hardwareID", hardwareID)
}
return
Expand Down Expand Up @@ -424,21 +494,21 @@ func (c *Controller) syncTrafficControl(tcName string) (err error) {

if !exist {
tcState = c.createTcState(tcName)
var dstInPort, dstOutPort uint32
var returnPort, targetPort uint32
// ReturnPort should only be set for Redirect action.The validation webhook checked it.
if tc.Spec.ReturnPort != nil {
if dstOutPort, err = c.getDevicePort(tc.Spec.ReturnPort); err != nil {
if returnPort, err = c.getDevicePort(tc.Spec.ReturnPort); err != nil {
return err
}
}
if dstInPort, err = c.getDevicePort(&tc.Spec.TargetPort); err != nil {
if targetPort, err = c.getDevicePort(&tc.Spec.TargetPort); err != nil {
return err
}
if err := c.ofClient.InstallTrafficControlInitFlows(dstInPort, dstOutPort, tc.Spec.Action); err != nil {
if err := c.ofClient.InstallTrafficControlCommonFlows(targetPort, returnPort); err != nil {
return err
}
tcState.dstOfInPort = dstInPort
tcState.dstOfOutPort = dstOutPort
tcState.targetPort = targetPort
tcState.dstOfOutPort = returnPort
}
var pods []*v1.Pod
if pods, err = c.filterPods(&tc.Spec.AppliedTo); err != nil {
Expand All @@ -463,7 +533,7 @@ func (c *Controller) syncTrafficControl(tcName string) (err error) {
podOfPorts = append(podOfPorts, uint32(ofPort))
}

if err := c.ofClient.InstallTrafficControlMarkFlows(tc.Name, podOfPorts, tcState.dstOfInPort, tc.Spec.Direction); err != nil {
if err := c.ofClient.InstallTrafficControlMarkFlows(tc.Name, podOfPorts, tcState.targetPort, tc.Spec.Direction, tc.Spec.Action); err != nil {
return err
}

Expand All @@ -475,7 +545,7 @@ func (c *Controller) uninstallTrafficControl(tcName string, ts *tcState) error {
return err
}

if err := c.ofClient.UninstallTrafficControlInitFlows(ts.dstOfInPort); err != nil {
if err := c.ofClient.UninstallTrafficControlCommonFlows(ts.targetPort); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 22b4aa2

Please sign in to comment.