From 4e6060072bbd7e47aae80c924a8215ab8caadb42 Mon Sep 17 00:00:00 2001 From: wenqi Date: Fri, 10 Jun 2022 19:01:26 +0800 Subject: [PATCH] Add trafficControlController to handle TrafficControl requests (#3487) - Use label selectors to filter Pods running on current Node. - Translate the selected Pods to OVS ports, which will be used to filter the packets that should be mirrored or redirected. - Translate the target device to the OVS port, which will be used as the target port the traffic should be mirrored or redirected. - Install OpenFlow rules calculated using the above arguments. Signed-off-by: Hongliang Liu Co-authored-by: Quan Tian Co-authored-by: Wenqi Qiu --- cmd/antrea-agent/agent.go | 21 +- pkg/agent/agent.go | 5 +- .../noderoute/node_route_controller.go | 1 + .../noderoute/node_route_controller_test.go | 6 +- .../controller/trafficcontrol/controller.go | 1028 +++++++++++++ .../trafficcontrol/controller_test.go | 1349 +++++++++++++++++ pkg/agent/interfacestore/types.go | 10 + pkg/apis/crd/v1alpha2/register.go | 2 + pkg/ovs/ovsconfig/interfaces.go | 3 +- pkg/ovs/ovsconfig/ovs_client.go | 21 +- pkg/ovs/ovsconfig/testing/mock_ovsconfig.go | 8 +- test/e2e/trafficcontrol_test.go | 338 +++++ test/integration/ovs/ovs_client_test.go | 2 +- 13 files changed, 2776 insertions(+), 18 deletions(-) create mode 100644 pkg/agent/controller/trafficcontrol/controller.go create mode 100644 pkg/agent/controller/trafficcontrol/controller_test.go create mode 100644 test/e2e/trafficcontrol_test.go diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index ded7abcea22..4172665599b 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -41,6 +41,7 @@ import ( "antrea.io/antrea/pkg/agent/controller/noderoute" "antrea.io/antrea/pkg/agent/controller/serviceexternalip" "antrea.io/antrea/pkg/agent/controller/traceflow" + "antrea.io/antrea/pkg/agent/controller/trafficcontrol" "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/exporter" "antrea.io/antrea/pkg/agent/interfacestore" @@ -65,6 +66,7 @@ import ( "antrea.io/antrea/pkg/monitor" ofconfig "antrea.io/antrea/pkg/ovs/openflow" "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/ovs/ovsctl" "antrea.io/antrea/pkg/signals" "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/cipher" @@ -99,10 +101,12 @@ func run(o *Options) error { crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync) traceflowInformer := crdInformerFactory.Crd().V1alpha1().Traceflows() egressInformer := crdInformerFactory.Crd().V1alpha2().Egresses() + externalIPPoolInformer := crdInformerFactory.Crd().V1alpha2().ExternalIPPools() + trafficControlInformer := crdInformerFactory.Crd().V1alpha2().TrafficControls() nodeInformer := informerFactory.Core().V1().Nodes() serviceInformer := informerFactory.Core().V1().Services() endpointsInformer := informerFactory.Core().V1().Endpoints() - externalIPPoolInformer := crdInformerFactory.Crd().V1alpha2().ExternalIPPools() + namespaceInformer := informerFactory.Core().V1().Namespaces() // Create Antrea Clientset for the given config. antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient) @@ -504,7 +508,8 @@ func run(o *Options) error { // Initialize localPodInformer for NPLAgent, AntreaIPAMController, and secondary network controller. var localPodInformer cache.SharedIndexInformer if enableNodePortLocal || enableBridgingMode || - features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { + features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) || + features.DefaultFeatureGate.Enabled(features.TrafficControl) { listOptions := func(options *metav1.ListOptions) { options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeConfig.Name).String() } @@ -578,6 +583,18 @@ func run(o *Options) error { go podWatchController.Run(stopCh) } + if features.DefaultFeatureGate.Enabled(features.TrafficControl) { + tcController := trafficcontrol.NewTrafficControlController(ofClient, + ifaceStore, + ovsBridgeClient, + ovsctl.NewClient(o.config.OVSBridge), + trafficControlInformer, + localPodInformer, + namespaceInformer, + podUpdateChannel) + go tcController.Run(stopCh) + } + // Start the localPodInformer if localPodInformer != nil { go localPodInformer.Run(stopCh) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 3914ed2acbd..df205b57722 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -37,6 +37,7 @@ import ( "antrea.io/antrea/pkg/agent/cniserver" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/controller/noderoute" + "antrea.io/antrea/pkg/agent/controller/trafficcontrol" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/openflow/cookie" @@ -280,6 +281,8 @@ func (i *Initializer) initInterfaceStore() error { case interfacestore.AntreaContainer: // The port should be for a container interface. intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort, true) + case interfacestore.AntreaTrafficControl: + intf = trafficcontrol.ParseTrafficControlInterfaceConfig(port, ovsPort) default: klog.InfoS("Unknown Antrea interface type", "type", interfaceType) } @@ -750,7 +753,7 @@ func (i *Initializer) setupDefaultTunnelInterface() error { externalIDs := map[string]interface{}{ interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaTunnel, } - tunnelPortUUID, err := i.ovsBridgeClient.CreateTunnelPortExt(tunnelPortName, i.networkConfig.TunnelType, config.DefaultTunOFPort, shouldEnableCsum, localIPStr, "", "", "", externalIDs) + tunnelPortUUID, err := i.ovsBridgeClient.CreateTunnelPortExt(tunnelPortName, i.networkConfig.TunnelType, config.DefaultTunOFPort, shouldEnableCsum, localIPStr, "", "", "", nil, externalIDs) if err != nil { klog.Errorf("Failed to create tunnel port %s type %s on OVS bridge: %v", tunnelPortName, i.networkConfig.TunnelType, err) return err diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index 2f43246d245..61105539235 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -686,6 +686,7 @@ func (c *Controller) createIPSecTunnelPort(nodeName string, nodeIP net.IP) (int3 nodeIP.String(), remoteName, psk, + nil, ovsExternalIDs) if err != nil { return 0, fmt.Errorf("failed to create IPsec tunnel port for Node %s", nodeName) diff --git a/pkg/agent/controller/noderoute/node_route_controller_test.go b/pkg/agent/controller/noderoute/node_route_controller_test.go index 8009d61e572..fd70d204270 100644 --- a/pkg/agent/controller/noderoute/node_route_controller_test.go +++ b/pkg/agent/controller/noderoute/node_route_controller_test.go @@ -341,11 +341,11 @@ func TestCreateIPSecTunnelPortPSK(t *testing.T) { node2PortName := util.GenerateNodeTunnelInterfaceName("xyz-k8s-0-2") c.ovsClient.EXPECT().CreateTunnelPortExt( node1PortName, ovsconfig.TunnelType("vxlan"), int32(0), - false, "", nodeIP1.String(), "", "changeme", + false, "", nodeIP1.String(), "", "changeme", nil, map[string]interface{}{ovsExternalIDNodeName: "xyz-k8s-0-1"}).Times(1) c.ovsClient.EXPECT().CreateTunnelPortExt( node2PortName, ovsconfig.TunnelType("vxlan"), int32(0), - false, "", nodeIP2.String(), "", "changeme", + false, "", nodeIP2.String(), "", "changeme", nil, map[string]interface{}{ovsExternalIDNodeName: "xyz-k8s-0-2"}).Times(1) c.ovsClient.EXPECT().GetOFPort(node1PortName, false).Return(int32(1), nil) c.ovsClient.EXPECT().GetOFPort(node2PortName, false).Return(int32(2), nil) @@ -404,7 +404,7 @@ func TestCreateIPSecTunnelPortCert(t *testing.T) { node1PortName := util.GenerateNodeTunnelInterfaceName("xyz-k8s-0-1") c.ovsClient.EXPECT().CreateTunnelPortExt( node1PortName, ovsconfig.TunnelType("vxlan"), int32(0), - false, "", nodeIP1.String(), "xyz-k8s-0-1", "", + false, "", nodeIP1.String(), "xyz-k8s-0-1", "", nil, map[string]interface{}{ovsExternalIDNodeName: "xyz-k8s-0-1"}).Times(1) c.ovsClient.EXPECT().GetOFPort(node1PortName, false).Return(int32(1), nil) diff --git a/pkg/agent/controller/trafficcontrol/controller.go b/pkg/agent/controller/trafficcontrol/controller.go new file mode 100644 index 00000000000..0b94482e6cf --- /dev/null +++ b/pkg/agent/controller/trafficcontrol/controller.go @@ -0,0 +1,1028 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trafficcontrol + +import ( + "crypto/sha1" // #nosec G505: not used for security purposes + "encoding/binary" + "encoding/hex" + "fmt" + "net" + "reflect" + "strconv" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/agent/util" + "antrea.io/antrea/pkg/apis/crd/v1alpha2" + crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha2" + crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha2" + "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/ovs/ovsctl" + "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/k8s" + utilsets "antrea.io/antrea/pkg/util/sets" +) + +const ( + controllerName = "TrafficControlController" + // How long to wait before retrying the processing of a TrafficControl change. + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second + // Default number of workers processing a TrafficControl change. + defaultWorkers = 4 + // Disable resyncing. + resyncPeriod time.Duration = 0 + + // Default VXLAN tunnel destination port. + defaultVXLANTunnelDestinationPort = int32(4789) + // Default GENEVE tunnel destination port. + defaultGENEVETunnelDestinationPort = int32(6081) + + portNamePrefixVXLAN = "vxlan" + portNamePrefixGENEVE = "geneve" + portNamePrefixGRE = "gre" + portNamePrefixERSPAN = "erspan" +) + +var ( + trafficControlPortExternalIDs = map[string]interface{}{ + interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaTrafficControl, + } +) + +// trafficControlState keeps the actual state of a TrafficControl that has been realized. +type trafficControlState struct { + // The actual name of target port used by a TrafficControl. + targetPortName string + // The actual openflow port for which we have installed for a TrafficControl. + targetOFPort uint32 + // The actual name of return port used by a TrafficControl. + returnPortName string + // The actual action of a TrafficControl. + action v1alpha2.TrafficControlAction + // The actual direction of a TrafficControl. + direction v1alpha2.Direction + // The actual openflow ports for which we have installed flows for a TrafficControl. Note that, flows are only installed + // for the Pods whose effective TrafficControl is the current TrafficControl, and the ports are these Pods'. + ofPorts sets.Int32 + // The actual Pods applied with the TrafficControl. Note that, a TrafficControl can be either effective TrafficControl + // or alternative TrafficControl for these Pods. + pods sets.String +} + +// podToTCBinding keeps the TrafficControls applied to a Pod. There is only one effective TrafficControl for a Pod at any +// given time. +type podToTCBinding struct { + effectiveTC string + alternativeTCs sets.String +} + +// portToTCBinding keeps the TrafficControls using an OVS port. +type portToTCBinding struct { + interfaceConfig *interfacestore.InterfaceConfig + trafficControls sets.String +} + +type Controller struct { + ofClient openflow.Client + + portToTCBindings map[string]*portToTCBinding + ovsBridgeClient ovsconfig.OVSBridgeClient + ovsCtlClient ovsctl.OVSCtlClient + ovsPortUpdateMutex sync.Mutex + + interfaceStore interfacestore.InterfaceStore + + podInformer cache.SharedIndexInformer + podLister corelisters.PodLister + podListerSynced cache.InformerSynced + + namespaceInformer cache.SharedIndexInformer + namespaceLister corelisters.NamespaceLister + namespaceListerSynced cache.InformerSynced + + podToTCBindings map[string]*podToTCBinding + podToTCBindingsMutex sync.RWMutex + + tcStates map[string]*trafficControlState + tcStatesMutex sync.RWMutex + + trafficControlInformer cache.SharedIndexInformer + trafficControlLister crdlisters.TrafficControlLister + trafficControlListerSynced cache.InformerSynced + queue workqueue.RateLimitingInterface +} + +func NewTrafficControlController(ofClient openflow.Client, + interfaceStore interfacestore.InterfaceStore, + ovsBridgeClient ovsconfig.OVSBridgeClient, + ovsCtlClient ovsctl.OVSCtlClient, + tcInformer crdinformers.TrafficControlInformer, + podInformer cache.SharedIndexInformer, + namespaceInformer coreinformers.NamespaceInformer, + podUpdateSubscriber channel.Subscriber) *Controller { + c := &Controller{ + ofClient: ofClient, + ovsBridgeClient: ovsBridgeClient, + ovsCtlClient: ovsCtlClient, + interfaceStore: interfaceStore, + trafficControlInformer: tcInformer.Informer(), + trafficControlLister: tcInformer.Lister(), + trafficControlListerSynced: tcInformer.Informer().HasSynced, + podInformer: podInformer, + podLister: corelisters.NewPodLister(podInformer.GetIndexer()), + podListerSynced: podInformer.HasSynced, + namespaceInformer: namespaceInformer.Informer(), + namespaceLister: namespaceInformer.Lister(), + namespaceListerSynced: namespaceInformer.Informer().HasSynced, + podToTCBindings: map[string]*podToTCBinding{}, + portToTCBindings: map[string]*portToTCBinding{}, + tcStates: map[string]*trafficControlState{}, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "trafficControlGroup"), + } + c.trafficControlInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addTC, + UpdateFunc: c.updateTC, + DeleteFunc: c.deleteTC, + }, + resyncPeriod, + ) + c.podInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addPod, + UpdateFunc: c.updatePod, + DeleteFunc: c.deletePod, + }, + resyncPeriod, + ) + c.namespaceInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addNamespace, + UpdateFunc: c.updateNamespace, + DeleteFunc: nil, + }, + resyncPeriod, + ) + podUpdateSubscriber.Subscribe(c.processPodUpdate) + return c +} + +// processPodUpdate will be called when CNIServer publishes a Pod update event, and the event of TrafficControl which is +// the effective one of the Pod is triggered. +func (c *Controller) processPodUpdate(e interface{}) { + c.podToTCBindingsMutex.RLock() + defer c.podToTCBindingsMutex.RUnlock() + podEvent := e.(types.PodUpdate) + pod := k8s.NamespacedName(podEvent.PodNamespace, podEvent.PodName) + binding, exists := c.podToTCBindings[pod] + if !exists { + return + } + c.queue.Add(binding.effectiveTC) +} + +func (c *Controller) matchedPod(pod *v1.Pod, to *v1alpha2.AppliedTo) bool { + if to.NamespaceSelector == nil && to.PodSelector == nil { + return false + } + if to.NamespaceSelector != nil { + namespace, _ := c.namespaceLister.Get(pod.Namespace) + if namespace == nil { + return false + } + nsSelector, _ := metav1.LabelSelectorAsSelector(to.NamespaceSelector) + if !nsSelector.Matches(labels.Set(namespace.Labels)) { + return false + } + } + if to.PodSelector != nil { + podSelector, _ := metav1.LabelSelectorAsSelector(to.PodSelector) + if !podSelector.Matches(labels.Set(pod.Labels)) { + return false + } + } + + return true +} + +func (c *Controller) filterAffectedTCsByPod(pod *v1.Pod) sets.String { + affectedTCs := sets.NewString() + allTCs, _ := c.trafficControlLister.List(labels.Everything()) + for _, tc := range allTCs { + if c.matchedPod(pod, &tc.Spec.AppliedTo) { + affectedTCs.Insert(tc.GetName()) + } + } + return affectedTCs +} + +func (c *Controller) addPod(obj interface{}) { + pod := obj.(*v1.Pod) + if pod.Spec.HostNetwork { + return + } + affectedTCs := c.filterAffectedTCsByPod(pod) + if len(affectedTCs) == 0 { + return + } + klog.V(2).InfoS("Processing Pod ADD event", "Pod", klog.KObj(pod)) + for affectedTC := range affectedTCs { + c.queue.Add(affectedTC) + } +} + +func (c *Controller) updatePod(oldObj interface{}, obj interface{}) { + oldPod := oldObj.(*v1.Pod) + pod := obj.(*v1.Pod) + if pod.Spec.HostNetwork { + return + } + if reflect.DeepEqual(pod.GetLabels(), oldPod.GetLabels()) { + return + } + oldAffectedTCs := c.filterAffectedTCsByPod(oldPod) + nowAffectedTCs := c.filterAffectedTCsByPod(pod) + affectedTCs := utilsets.SymmetricDifferenceString(oldAffectedTCs, nowAffectedTCs) + if len(affectedTCs) == 0 { + return + } + klog.V(2).InfoS("Processing Pod UPDATE event", "Pod", klog.KObj(pod)) + for affectedTC := range affectedTCs { + c.queue.Add(affectedTC) + } +} + +func (c *Controller) deletePod(obj interface{}) { + pod := obj.(*v1.Pod) + if pod.Spec.HostNetwork { + return + } + affectedTCs := c.filterAffectedTCsByPod(pod) + if len(affectedTCs) == 0 { + return + } + klog.V(2).InfoS("Processing Pod DELETE event", "Pod", klog.KObj(pod)) + for affectedTC := range affectedTCs { + c.queue.Add(affectedTC) + } +} + +func matchedNamespace(namespace *v1.Namespace, to *v1alpha2.AppliedTo) bool { + if to.NamespaceSelector != nil { + nsSelector, _ := metav1.LabelSelectorAsSelector(to.NamespaceSelector) + if !nsSelector.Matches(labels.Set(namespace.Labels)) { + return false + } + } + return true +} + +func (c *Controller) filterAffectedTCsByNS(namespace *v1.Namespace) sets.String { + affectedTCs := sets.NewString() + allTCs, _ := c.trafficControlLister.List(labels.Everything()) + for _, tc := range allTCs { + if matchedNamespace(namespace, &tc.Spec.AppliedTo) { + affectedTCs.Insert(tc.GetName()) + } + } + return affectedTCs +} + +func (c *Controller) addNamespace(obj interface{}) { + ns := obj.(*v1.Namespace) + affectedTCs := c.filterAffectedTCsByNS(ns) + if len(affectedTCs) == 0 { + return + } + klog.V(2).InfoS("Processing Namespace ADD event", "Namespace", klog.KObj(ns)) + for tc := range affectedTCs { + c.queue.Add(tc) + } +} + +func (c *Controller) updateNamespace(oldObj, obj interface{}) { + oldNS := oldObj.(*v1.Namespace) + ns := obj.(*v1.Namespace) + if reflect.DeepEqual(oldNS.GetLabels(), ns.GetLabels()) { + return + } + oldAffectedTCs := c.filterAffectedTCsByNS(oldNS) + nowAffectedTCs := c.filterAffectedTCsByNS(ns) + affectedTCs := utilsets.SymmetricDifferenceString(oldAffectedTCs, nowAffectedTCs) + if len(affectedTCs) == 0 { + return + } + klog.V(2).InfoS("Processing Namespace UPDATE event", "Namespace", klog.KObj(ns)) + for tc := range affectedTCs { + c.queue.Add(tc) + } +} + +func (c *Controller) addTC(obj interface{}) { + tc := obj.(*v1alpha2.TrafficControl) + klog.V(2).InfoS("Processing TrafficControl ADD event", "TrafficControl", klog.KObj(tc)) + c.queue.Add(tc.Name) +} + +func (c *Controller) updateTC(oldObj interface{}, obj interface{}) { + oldTC := oldObj.(*v1alpha2.TrafficControl) + tc := obj.(*v1alpha2.TrafficControl) + if tc.GetGeneration() != oldTC.GetGeneration() { + klog.V(2).InfoS("Processing TrafficControl UPDATE event", "TrafficControl", klog.KObj(tc)) + c.queue.Add(tc.Name) + } +} + +func (c *Controller) deleteTC(obj interface{}) { + tc := obj.(*v1alpha2.TrafficControl) + klog.V(2).InfoS("Processing TrafficControl DELETE event", "TrafficControl", klog.KObj(tc)) + c.queue.Add(tc.Name) +} + +func (c *Controller) Run(stopCh <-chan struct{}) { + defer c.queue.ShutDown() + + klog.InfoS("Starting", "controllerName", controllerName) + defer klog.InfoS("Shutting down", "controllerName", controllerName) + + if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.trafficControlListerSynced, c.podListerSynced, c.namespaceListerSynced) { + return + } + + for i := 0; i < defaultWorkers; i++ { + go wait.Until(c.worker, time.Second, stopCh) + } + + <-stopCh +} + +func (c *Controller) worker() { + for c.processNextWorkItem() { + } +} + +func (c *Controller) processNextWorkItem() bool { + obj, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(obj) + + if key, ok := obj.(string); !ok { + // As the item in the work queue is actually invalid, we call Forget here else we'd + // go into a loop of attempting to process a work item that is invalid. + // This should not happen. + c.queue.Forget(obj) + klog.Errorf("Expected string in work queue but got %#v", obj) + return true + } else if err := c.syncTrafficControl(key); err == nil { + // If no error occurs we Forget this item, so it does not get queued again until + // another change happens. + c.queue.Forget(key) + } else { + // Put the item back on the work queue to handle any transient errors. + c.queue.AddRateLimited(key) + klog.ErrorS(err, "Syncing TrafficControl failed, requeue", "TrafficControl", key) + } + return true +} + +func (c *Controller) newTrafficControlState(tcName string, action v1alpha2.TrafficControlAction, direction v1alpha2.Direction) *trafficControlState { + c.tcStatesMutex.Lock() + defer c.tcStatesMutex.Unlock() + state := &trafficControlState{ + pods: sets.NewString(), + ofPorts: sets.NewInt32(), + action: action, + direction: direction, + } + c.tcStates[tcName] = state + return state +} + +func (c *Controller) getTrafficControlState(tcName string) (*trafficControlState, bool) { + c.tcStatesMutex.RLock() + defer c.tcStatesMutex.RUnlock() + state, exists := c.tcStates[tcName] + return state, exists +} + +func (c *Controller) deleteTrafficControlState(tcName string) { + c.tcStatesMutex.Lock() + defer c.tcStatesMutex.Unlock() + delete(c.tcStates, tcName) +} + +func (c *Controller) filterPods(appliedTo *v1alpha2.AppliedTo) ([]*v1.Pod, error) { + // If both selectors are nil, no Pod should be selected. + if appliedTo.PodSelector == nil && appliedTo.NamespaceSelector == nil { + return nil, nil + } + var podSelector, nsSelector labels.Selector + var err error + var selectedPods []*v1.Pod + + if appliedTo.PodSelector != nil { + // If Pod selector is not nil, use it to select Pods. + podSelector, err = metav1.LabelSelectorAsSelector(appliedTo.PodSelector) + if err != nil { + return nil, err + } + } else { + // If Pod selector is nil, then Namespace selector will not be nil, select all Pods from the selected Namespaces. + podSelector = labels.Everything() + } + + if appliedTo.NamespaceSelector != nil { + // If Namespace selector is not nil, use it to select Namespaces. + var namespaces []*v1.Namespace + nsSelector, err = metav1.LabelSelectorAsSelector(appliedTo.NamespaceSelector) + if err != nil { + return nil, err + } + namespaces, err = c.namespaceLister.List(nsSelector) + if err != nil { + return nil, err + } + // Select Pods with Pod selector from the selected Namespaces. + for _, ns := range namespaces { + pods, err := c.podLister.Pods(ns.Name).List(podSelector) + if err != nil { + return nil, err + } + selectedPods = append(selectedPods, pods...) + } + } else { + // If Namespace selector is nil, use Pod selector to select Pods from all Namespaces. + selectedPods, err = c.podLister.List(podSelector) + if err != nil { + return nil, err + } + } + + var nonHostNetworkPods []*v1.Pod + // TrafficControl does not support host network Pods. + for _, pod := range selectedPods { + if !pod.Spec.HostNetwork { + nonHostNetworkPods = append(nonHostNetworkPods, pod) + } + } + + return nonHostNetworkPods, nil +} + +func genVXLANPortName(tunnel *v1alpha2.UDPTunnel) string { + hash := sha1.New() // #nosec G401: not used for security purposes + hash.Write(net.ParseIP(tunnel.RemoteIP)) + + destinationPort := defaultVXLANTunnelDestinationPort + if tunnel.DestinationPort != nil { + destinationPort = *tunnel.DestinationPort + } + binary.Write(hash, binary.BigEndian, destinationPort) + var vni int32 + if tunnel.VNI != nil { + vni = *tunnel.VNI + } + binary.Write(hash, binary.BigEndian, vni) + return fmt.Sprintf("%s-%s", portNamePrefixVXLAN, hex.EncodeToString(hash.Sum(nil))[:6]) +} + +func genGENEVEPortName(tunnel *v1alpha2.UDPTunnel) string { + hash := sha1.New() // #nosec G401: not used for security purposes + hash.Write(net.ParseIP(tunnel.RemoteIP)) + + destinationPort := defaultGENEVETunnelDestinationPort + if tunnel.DestinationPort != nil { + destinationPort = *tunnel.DestinationPort + } + binary.Write(hash, binary.BigEndian, destinationPort) + var vni int32 + if tunnel.VNI != nil { + vni = *tunnel.VNI + } + binary.Write(hash, binary.BigEndian, vni) + return fmt.Sprintf("%s-%s", portNamePrefixGENEVE, hex.EncodeToString(hash.Sum(nil))[:6]) +} + +func genGREPortName(tunnel *v1alpha2.GRETunnel) string { + hash := sha1.New() // #nosec G401: not used for security purposes + hash.Write(net.ParseIP(tunnel.RemoteIP)) + + var key int32 + if tunnel.Key != nil { + key = *tunnel.Key + } + binary.Write(hash, binary.BigEndian, key) + return fmt.Sprintf("%s-%s", portNamePrefixGRE, hex.EncodeToString(hash.Sum(nil))[:6]) +} + +// genERSPANPortName generates a port name for the given ERSPAN tunnel. +// Note that ERSPAN tunnel's uniqueness is based on the remote IP and the session ID only, which means if there are two +// tunnels having same remote IP and session ID but different other attributes, creating the second port would fail in +// OVS. +func genERSPANPortName(tunnel *v1alpha2.ERSPANTunnel) string { + hash := sha1.New() // #nosec G401: not used for security purposes + hash.Write(net.ParseIP(tunnel.RemoteIP)) + + var sessionID, index, dir, hardwareID int32 + if tunnel.SessionID != nil { + sessionID = *tunnel.SessionID + } + if tunnel.Index != nil { + index = *tunnel.Index + } + if tunnel.Dir != nil { + dir = *tunnel.Dir + } + if tunnel.HardwareID != nil { + hardwareID = *tunnel.HardwareID + } + binary.Write(hash, binary.BigEndian, sessionID) + binary.Write(hash, binary.BigEndian, tunnel.Version) + binary.Write(hash, binary.BigEndian, index) + binary.Write(hash, binary.BigEndian, dir) + binary.Write(hash, binary.BigEndian, hardwareID) + return fmt.Sprintf("%s-%s", portNamePrefixERSPAN, hex.EncodeToString(hash.Sum(nil))[:6]) +} + +func ParseTrafficControlInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) *interfacestore.InterfaceConfig { + return &interfacestore.InterfaceConfig{ + Type: interfacestore.TrafficControlInterface, + InterfaceName: portData.Name, + OVSPortConfig: portConfig} +} + +// createOVSInternalPort creates an OVS internal port on OVS and corresponding interface on host. Note that, host interface +// might not be available immediately after creating OVS internal port. +func (c *Controller) createOVSInternalPort(portName string) (string, error) { + portUUID, err := c.ovsBridgeClient.CreateInternalPort(portName, 0, trafficControlPortExternalIDs) + if err != nil { + return "", err + } + if pollErr := wait.PollImmediate(time.Second, 5*time.Second, func() (bool, error) { + _, _, err := util.SetLinkUp(portName) + if err == nil { + return true, nil + } + if _, ok := err.(util.LinkNotFound); ok { + return false, nil + } + return false, err + }); pollErr != nil { + return "", pollErr + } + return portUUID, nil +} + +func (c *Controller) createUDPTunnelPort(portName string, tunnelType ovsconfig.TunnelType, tunnelConfig *v1alpha2.UDPTunnel) (string, error) { + extraOptions := map[string]interface{}{} + if tunnelConfig.DestinationPort != nil { + extraOptions["dst_port"] = strconv.Itoa(int(*tunnelConfig.DestinationPort)) + } + if tunnelConfig.VNI != nil { + extraOptions["key"] = strconv.Itoa(int(*tunnelConfig.VNI)) + } + portUUID, err := c.ovsBridgeClient.CreateTunnelPortExt(portName, + tunnelType, + 0, + false, + "", + tunnelConfig.RemoteIP, + "", + "", + extraOptions, + trafficControlPortExternalIDs) + return portUUID, err +} + +func (c *Controller) createGREPort(portName string, tunnelConfig *v1alpha2.GRETunnel) (string, error) { + extraOptions := map[string]interface{}{} + if tunnelConfig.Key != nil { + extraOptions["key"] = strconv.Itoa(int(*tunnelConfig.Key)) + } + portUUID, err := c.ovsBridgeClient.CreateTunnelPortExt(portName, + ovsconfig.GRETunnel, + 0, + false, + "", + tunnelConfig.RemoteIP, + "", + "", + extraOptions, + trafficControlPortExternalIDs) + return portUUID, err +} + +func (c *Controller) createERSPANPort(portName string, tunnelConfig *v1alpha2.ERSPANTunnel) (string, error) { + extraOptions := make(map[string]interface{}) + extraOptions["erspan_ver"] = strconv.Itoa(int(tunnelConfig.Version)) + if tunnelConfig.SessionID != nil { + extraOptions["key"] = strconv.Itoa(int(*tunnelConfig.SessionID)) + } + if tunnelConfig.Version == 1 { + if tunnelConfig.Index != nil { + extraOptions["erspan_idx"] = strconv.FormatInt(int64(*tunnelConfig.Index), 16) + } + } else if tunnelConfig.Version == 2 { + if tunnelConfig.Dir != nil { + extraOptions["erspan_dir"] = strconv.Itoa(int(*tunnelConfig.Dir)) + } + if tunnelConfig.HardwareID != nil { + extraOptions["erspan_hwid"] = strconv.Itoa(int(*tunnelConfig.HardwareID)) + } + } + portUUID, err := c.ovsBridgeClient.CreateTunnelPortExt(portName, + ovsconfig.ERSPANTunnel, + 0, + false, + "", + tunnelConfig.RemoteIP, + "", + "", + extraOptions, + trafficControlPortExternalIDs) + return portUUID, err +} + +func (c *Controller) getPortName(port *v1alpha2.TrafficControlPort) string { + var portName string + switch { + case port.OVSInternal != nil: + portName = port.OVSInternal.Name + case port.Device != nil: + portName = port.Device.Name + case port.VXLAN != nil: + portName = genVXLANPortName(port.VXLAN) + case port.GENEVE != nil: + portName = genGENEVEPortName(port.GENEVE) + case port.GRE != nil: + portName = genGREPortName(port.GRE) + case port.ERSPAN != nil: + portName = genERSPANPortName(port.ERSPAN) + } + return portName +} + +// getOrCreateTrafficControlPort ensures that there is an OVS port for the given TrafficControlPort and binds the port +// to the TrafficControl. The OVS port will be created if the port doesn't exist. It returns the ofPort of the OVS port +// on success, an error if there is. +func (c *Controller) getOrCreateTrafficControlPort(port *v1alpha2.TrafficControlPort, portName, tcName string, isReturnPort bool) (uint32, error) { + c.ovsPortUpdateMutex.Lock() + defer c.ovsPortUpdateMutex.Unlock() + + // Query the port binding information from portToTCBindings. If the corresponding binding information exists, indicating + // that the port has been created, then insert the TrafficControl to the set of TrafficControls using the port. + if binding, exists := c.portToTCBindings[portName]; exists { + c.portToTCBindings[portName].trafficControls.Insert(tcName) + return uint32(binding.interfaceConfig.OFPort), nil + } + + // If there is no binding information of the port in portToTCBindings, query the interface store. If corresponding + // config is found, create binding information for the port. Note that, this is used to rebuild portToTCBindings + // after restarting Antrea Agent. + if itf, ok := c.interfaceStore.GetInterfaceByName(portName); ok { + // If the port is a return port, although the port is not newly created here, return flow should be installed for + // the port when it is used by a TrafficControl for the first time. + if isReturnPort { + if err := c.ofClient.InstallTrafficControlReturnPortFlow(uint32(itf.OFPort)); err != nil { + return 0, err + } + } + c.portToTCBindings[portName] = &portToTCBinding{ + interfaceConfig: itf, + trafficControls: sets.NewString(tcName), + } + return uint32(itf.OFPort), nil + } + + var portUUID string + var err error + + switch { + case port.OVSInternal != nil: + portUUID, err = c.createOVSInternalPort(portName) + case port.Device != nil: + portUUID, err = c.ovsBridgeClient.CreatePort(portName, portName, trafficControlPortExternalIDs) + case port.VXLAN != nil: + portUUID, err = c.createUDPTunnelPort(portName, ovsconfig.VXLANTunnel, port.VXLAN) + case port.GENEVE != nil: + portUUID, err = c.createUDPTunnelPort(portName, ovsconfig.GeneveTunnel, port.GENEVE) + case port.GRE != nil: + portUUID, err = c.createGREPort(portName, port.GRE) + case port.ERSPAN != nil: + portUUID, err = c.createERSPANPort(portName, port.ERSPAN) + } + + if err != nil { + return 0, err + } + + ofPort, err := c.ovsBridgeClient.GetOFPort(portName, false) + if err != nil { + return 0, err + } + // Set the port with no-flood to reject ARP flood packets. + if err = c.ovsCtlClient.SetPortNoFlood(int(ofPort)); err != nil { + return 0, fmt.Errorf("failed to set port %s with no-flood config: %w", portName, err) + } + + // If the port is a return port and is newly created, install a return flow for the port. + if isReturnPort { + if err = c.ofClient.InstallTrafficControlReturnPortFlow(uint32(ofPort)); err != nil { + return 0, err + } + } + itf := interfacestore.NewTrafficControlInterface(portName) + itf.OVSPortConfig = &interfacestore.OVSPortConfig{PortUUID: portUUID, OFPort: ofPort} + c.interfaceStore.AddInterface(itf) + // Create binding for the newly created port. + c.portToTCBindings[portName] = &portToTCBinding{ + interfaceConfig: itf, + trafficControls: sets.NewString(tcName), + } + return uint32(ofPort), nil +} + +// releaseTrafficControlPort releases the port from the TrafficControl and deletes the port if it is no longer used by +// any TrafficControl. +func (c *Controller) releaseTrafficControlPort(portName, tcName string, isReturnPort bool) error { + c.ovsPortUpdateMutex.Lock() + defer c.ovsPortUpdateMutex.Unlock() + portBinding, exists := c.portToTCBindings[portName] + if !exists { + klog.InfoS("Port used by TrafficControl has been deleted", "port", portName, "TrafficControl", tcName) + return nil + } + + portBinding.trafficControls.Delete(tcName) + if len(portBinding.trafficControls) == 0 { + // If the port is no longer used by any TrafficControl, delete the port. + if err := c.ovsBridgeClient.DeletePort(portBinding.interfaceConfig.PortUUID); err != nil { + return err + } + // Uninstall corresponding return flow if the port is a return port. + if isReturnPort { + if err := c.ofClient.UninstallTrafficControlReturnPortFlow(uint32(portBinding.interfaceConfig.OFPort)); err != nil { + return err + } + } + c.interfaceStore.DeleteInterface(portBinding.interfaceConfig) + delete(c.portToTCBindings, portName) + } + return nil +} + +func (c *Controller) syncTrafficControl(tcName string) error { + startTime := time.Now() + defer func() { + klog.V(2).InfoS("Finished syncing TrafficControl", "TrafficControl", tcName, "durationTime", time.Since(startTime)) + }() + + tc, err := c.trafficControlLister.Get(tcName) + if err != nil { + if apierrors.IsNotFound(err) { + // If the TrafficControl is deleted and the corresponding state doesn't exist, just return. + tcState, exists := c.getTrafficControlState(tcName) + if !exists { + return nil + } + // If a TrafficControl is deleted but the corresponding state exists, do some cleanup for the deleted + // TrafficControl. + if err = c.uninstallTrafficControl(tcName, tcState); err != nil { + return err + } + // Delete the state of the deleted TrafficControl. + c.deleteTrafficControlState(tcName) + return nil + } + return err + } + + // Get the TrafficControl state. + tcState, exists := c.getTrafficControlState(tcName) + // If the TrafficControl exists and corresponding state doesn't exist, create state for the TrafficControl. + if !exists { + tcState = c.newTrafficControlState(tcName, tc.Spec.Action, tc.Spec.Direction) + } + + if tc.Spec.ReturnPort != nil { + // Get name of the return port. + returnPortName := c.getPortName(tc.Spec.ReturnPort) + // If the name is different from the cached name in the TrafficControl state, it could be caused by the return + // port update of the TrafficControl or the creation of the TrafficControl. + if returnPortName != tcState.returnPortName { + if tcState.returnPortName != "" { + // If the stale return port name cached in TrafficControl state is not empty, release the stale return port + // from the TrafficControl. + if err = c.releaseTrafficControlPort(returnPortName, tcName, true); err != nil { + return err + } + } + // Get or create the return port. + if _, err = c.getOrCreateTrafficControlPort(tc.Spec.ReturnPort, returnPortName, tcName, true); err != nil { + return err + } + // Update return port name in state. + tcState.returnPortName = returnPortName + } + } + + // Get name of the target port. + targetPortName := c.getPortName(&tc.Spec.TargetPort) + // If the name is different from the cached name in the TrafficControl state, it could be caused by the target port + // update of the TrafficControl or the creation of the TrafficControl. + if targetPortName != tcState.targetPortName { + if tcState.targetPortName != "" { + // If the stale target port name cached in TrafficControl state is not empty, release the stale target port + // from the TrafficControl. + if err = c.releaseTrafficControlPort(tcState.targetPortName, tcName, false); err != nil { + return err + } + } + // Update target port name in state. + tcState.targetPortName = targetPortName + } + + // Get or create the target port. + targetOFPort, err := c.getOrCreateTrafficControlPort(&tc.Spec.TargetPort, targetPortName, tcName, false) + if err != nil { + return err + } + + // Check if the mark flows should be updated. + var needUpdateMarkFlows bool + if tcState.targetOFPort != targetOFPort || tcState.action != tc.Spec.Action || tcState.direction != tc.Spec.Direction { + needUpdateMarkFlows = true + } + + // Get the list of Pods applying to the TrafficControl. + var pods []*v1.Pod + if pods, err = c.filterPods(&tc.Spec.AppliedTo); err != nil { + return err + } + + stalePods := tcState.pods.Union(nil) + newPods := sets.NewString() + newOfPorts := sets.NewInt32() + for _, pod := range pods { + podNN := k8s.NamespacedName(pod.Namespace, pod.Name) + newPods.Insert(podNN) + stalePods.Delete(podNN) + + // If the TrafficControl is not the effective TrafficControl for the Pod, do nothing. + if !c.bindPodToTrafficControl(podNN, tcName) { + continue + } + + // If the TrafficControl is the effective TrafficControl for the Pod, insert the port to the new set in + // TrafficControl state. + podInterfaces := c.interfaceStore.GetContainerInterfacesByPod(pod.Name, pod.Namespace) + if len(podInterfaces) == 0 { + klog.InfoS("Interfaces of Pod not found", "Pod", klog.KObj(pod)) + continue + } + newOfPorts.Insert(podInterfaces[0].OFPort) + } + + // If target ofPort / direction / action in TrafficControl is updated, the mark flows should be reinstalled; if the + // new ofPort set is different from the old ofPort set, the mark flows should be also reinstalled. + if needUpdateMarkFlows || !newOfPorts.Equal(tcState.ofPorts) { + var ofPorts []uint32 + for _, port := range newOfPorts.List() { + ofPorts = append(ofPorts, uint32(port)) + } + if err = c.ofClient.InstallTrafficControlMarkFlows(tc.Name, ofPorts, targetOFPort, tc.Spec.Direction, tc.Spec.Action); err != nil { + return err + } + } + // Update TrafficControl state. + tcState.pods = newPods + tcState.ofPorts = newOfPorts + tcState.targetOFPort = targetOFPort + tcState.action = tc.Spec.Action + tcState.direction = tc.Spec.Direction + + if len(stalePods) != 0 { + // Resync the Pods applying to the TrafficControl to be deleted. + c.podsResync(stalePods, tcName) + } + + return nil +} + +func (c *Controller) uninstallTrafficControl(tcName string, tcState *trafficControlState) error { + // Uninstall the mark flows of the TrafficControl. + if err := c.ofClient.UninstallTrafficControlMarkFlows(tcName); err != nil { + return err + } + + // Release the target port from the deleted TrafficControl. + if tcState.targetPortName != "" { + if err := c.releaseTrafficControlPort(tcState.targetPortName, tcName, false); err != nil { + return err + } + } + // Release the return port from the deleted TrafficControl. + if tcState.returnPortName != "" { + if err := c.releaseTrafficControlPort(tcState.returnPortName, tcName, true); err != nil { + return err + } + } + // Resync the Pods applying to the deleted TrafficControl. + if len(tcState.pods) != 0 { + c.podsResync(tcState.pods, tcName) + } + return nil +} + +func (c *Controller) podsResync(pods sets.String, tcName string) { + // Resync the Pods that have new effective TrafficControl. + newEffectiveTCs := sets.NewString() + for pod := range pods { + if newEffectiveTC := c.unbindPodFromTrafficControl(pod, tcName); newEffectiveTC != "" { + newEffectiveTCs.Insert(newEffectiveTC) + } + } + // Trigger resyncing of the new effective TrafficControls of the Pods. + for tc := range newEffectiveTCs { + c.queue.Add(tc) + } +} + +// bindPodToTrafficControl binds the Pod with the TrafficControl and returns whether this TrafficControl is the effective +// one for the Pod. +func (c *Controller) bindPodToTrafficControl(pod, tc string) bool { + c.podToTCBindingsMutex.Lock() + defer c.podToTCBindingsMutex.Unlock() + + binding, exists := c.podToTCBindings[pod] + if !exists { + // Promote itself as the effective TrafficControl for the Pod if there is no binding information for the Pod. + c.podToTCBindings[pod] = &podToTCBinding{ + effectiveTC: tc, + alternativeTCs: sets.NewString(), + } + return true + } + if binding.effectiveTC == tc { + return true + } + if !binding.alternativeTCs.Has(tc) { + binding.alternativeTCs.Insert(tc) + } + return false +} + +// unbindPodFromTrafficControl unbinds the Pod with the TrafficControl. If the unbound TrafficControl was the effective +// one for the Pod and there are alternative ones, it will return the new effective TrafficControl, otherwise return empty +// string. +func (c *Controller) unbindPodFromTrafficControl(pod, tcName string) string { + c.podToTCBindingsMutex.Lock() + defer c.podToTCBindingsMutex.Unlock() + + // The binding must exist. + binding := c.podToTCBindings[pod] + if binding.effectiveTC == tcName { + var popped bool + // Select a new effective TrafficControl. + binding.effectiveTC, popped = binding.alternativeTCs.PopAny() + if !popped { + // Remove the binding information for the Pod if there is no alternative TrafficControls. + delete(c.podToTCBindings, pod) + return "" + } + return binding.effectiveTC + } + binding.alternativeTCs.Delete(tcName) + return "" +} diff --git a/pkg/agent/controller/trafficcontrol/controller_test.go b/pkg/agent/controller/trafficcontrol/controller_test.go new file mode 100644 index 00000000000..59fc7da73fd --- /dev/null +++ b/pkg/agent/controller/trafficcontrol/controller_test.go @@ -0,0 +1,1349 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trafficcontrol + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + + "antrea.io/antrea/pkg/agent/interfacestore" + openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/agent/util" + "antrea.io/antrea/pkg/apis/crd/v1alpha2" + fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake" + crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" + "antrea.io/antrea/pkg/ovs/ovsconfig" + ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" + ovsctltest "antrea.io/antrea/pkg/ovs/ovsctl/testing" + "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/k8s" +) + +type fakeController struct { + *Controller + mockController *gomock.Controller + mockOFClient *openflowtest.MockClient + mockOVSCtlClient *ovsctltest.MockOVSCtlClient + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient + crdClient *fakeversioned.Clientset + crdInformerFactory crdinformers.SharedInformerFactory + client *fake.Clientset + informerFactory informers.SharedInformerFactory + localPodInformer cache.SharedIndexInformer + podUpdateChannel *channel.SubscribableChannel +} + +var ( + labels1 = map[string]string{"app1": "foo1"} + labels2 = map[string]string{"app2": "foo2"} + labels3 = map[string]string{"app3": "foo3"} + + targetPort1Name = "target-port1" + returnPort1Name = "return-port1" + targetPort2Name = "target-port2" + returnPort2Name = "return-port2" + targetPort3Name = "target-port3" + + ns1 = newNamespace("ns1", labels1) + ns2 = newNamespace("ns2", labels2) + + targetPort1 = &v1alpha2.NetworkDevice{Name: targetPort1Name} + returnPort1 = &v1alpha2.NetworkDevice{Name: returnPort1Name} + targetPort2 = &v1alpha2.NetworkDevice{Name: targetPort2Name} + returnPort2 = &v1alpha2.NetworkDevice{Name: returnPort2Name} + targetPort3 = &v1alpha2.NetworkDevice{Name: targetPort3Name} + + pod1 = newPod("ns1", "pod1", "fakeNode", labels1) + pod2 = newPod("ns1", "pod2", "fakeNode", labels2) + pod3 = newPod("ns2", "pod3", "fakeNode", labels1) + pod4 = newPod("ns2", "pod4", "fakeNode", labels2) + + pod1NN = k8s.NamespacedName("ns1", "pod1") + pod2NN = k8s.NamespacedName("ns1", "pod2") + pod3NN = k8s.NamespacedName("ns2", "pod3") + pod4NN = k8s.NamespacedName("ns2", "pod4") + + pod1OFPort = uint32(1) + pod2OFPort = uint32(2) + pod3OFPort = uint32(3) + pod4OFPort = uint32(4) + targetPort1OFPort = uint32(5) + targetPort2OFPort = uint32(7) + returnPort2OFPort = uint32(8) + targetPort3OFPort = uint32(9) + + podInterface1 = newPodInterface("ns1", "pod1", int32(pod1OFPort)) + podInterface2 = newPodInterface("ns1", "pod2", int32(pod2OFPort)) + podInterface3 = newPodInterface("ns2", "pod3", int32(pod3OFPort)) + podInterface4 = newPodInterface("ns2", "pod4", int32(pod4OFPort)) + targetInterface1 = newTrafficControlInterface(targetPort1Name, int32(targetPort1OFPort)) + targetInterface2 = newTrafficControlInterface(targetPort2Name, int32(targetPort2OFPort)) + returnInterface2 = newTrafficControlInterface(returnPort2Name, int32(returnPort2OFPort)) + targetInterface3 = newTrafficControlInterface(targetPort3Name, int32(targetPort3OFPort)) + + tc1Name = "test-tc1" + tc2Name = "test-tc2" + tc3Name = "test-tc3" + + directionIngress = v1alpha2.DirectionIngress + directionEgress = v1alpha2.DirectionEgress + actionMirror = v1alpha2.ActionMirror + actionRedirect = v1alpha2.ActionRedirect + + externalIDs = map[string]interface{}{interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaTrafficControl} +) + +func newFakeController(t *testing.T, objects []runtime.Object, initObjects []runtime.Object, interfaces []*interfacestore.InterfaceConfig) *fakeController { + controller := gomock.NewController(t) + mockOFClient := openflowtest.NewMockClient(controller) + mockOVSBridgeClient := ovsconfigtest.NewMockOVSBridgeClient(controller) + mockOVSCtlClient := ovsctltest.NewMockOVSCtlClient(controller) + + client := fake.NewSimpleClientset(objects...) + crdClient := fakeversioned.NewSimpleClientset(initObjects...) + + crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0) + tcInformer := crdInformerFactory.Crd().V1alpha2().TrafficControls() + informerFactory := informers.NewSharedInformerFactory(client, 0) + nsInformer := informerFactory.Core().V1().Namespaces() + + localPodInformer := coreinformers.NewFilteredPodInformer( + client, + metav1.NamespaceAll, + 0, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", "fakeNode1").String() + }, + ) + + ifaceStore := interfacestore.NewInterfaceStore() + for _, itf := range interfaces { + ifaceStore.AddInterface(itf) + } + + podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) + tcController := NewTrafficControlController(mockOFClient, ifaceStore, mockOVSBridgeClient, mockOVSCtlClient, tcInformer, localPodInformer, nsInformer, podUpdateChannel) + podUpdateChannel.Subscribe(tcController.processPodUpdate) + + return &fakeController{ + Controller: tcController, + mockController: controller, + mockOFClient: mockOFClient, + mockOVSBridgeClient: mockOVSBridgeClient, + mockOVSCtlClient: mockOVSCtlClient, + crdClient: crdClient, + crdInformerFactory: crdInformerFactory, + client: client, + informerFactory: informerFactory, + localPodInformer: localPodInformer, + podUpdateChannel: podUpdateChannel, + } +} + +func newPod(ns, name, nodeName string, labels map[string]string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + Labels: labels, + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + }, + } +} + +func newPodInterface(podNamespace, podName string, ofPort int32) *interfacestore.InterfaceConfig { + containerName := k8s.NamespacedName(podNamespace, podName) + return &interfacestore.InterfaceConfig{ + InterfaceName: util.GenerateContainerInterfaceName(podName, podNamespace, containerName), + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{PodName: podName, PodNamespace: podNamespace, ContainerID: containerName}, + OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: ofPort}, + } +} + +func newTrafficControlInterface(interfaceName string, ofPort int32) *interfacestore.InterfaceConfig { + return &interfacestore.InterfaceConfig{ + Type: interfacestore.TrafficControlInterface, + InterfaceName: interfaceName, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{}, + OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: ofPort, PortUUID: interfaceName}, + TunnelInterfaceConfig: &interfacestore.TunnelInterfaceConfig{}, + } +} + +func newNamespace(ns string, labels map[string]string) *v1.Namespace { + return &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: ns, + Labels: labels, + }, + } +} + +func generateTrafficControl(name string, + nsSelector, + podSelector map[string]string, + direction v1alpha2.Direction, + action v1alpha2.TrafficControlAction, + targetPort interface{}, + isTargetPortVXLAN bool, + returnPort interface{}) *v1alpha2.TrafficControl { + tc := &v1alpha2.TrafficControl{ + ObjectMeta: metav1.ObjectMeta{Name: name, UID: "test-uid"}, + Spec: v1alpha2.TrafficControlSpec{ + Direction: direction, + Action: action, + ReturnPort: &v1alpha2.TrafficControlPort{}, + }} + if nsSelector != nil { + tc.Spec.AppliedTo.NamespaceSelector = &metav1.LabelSelector{MatchLabels: nsSelector} + } + if podSelector != nil { + tc.Spec.AppliedTo.PodSelector = &metav1.LabelSelector{MatchLabels: podSelector} + } + switch targetPort.(type) { + case *v1alpha2.OVSInternalPort: + tc.Spec.TargetPort.OVSInternal = targetPort.(*v1alpha2.OVSInternalPort) + case *v1alpha2.NetworkDevice: + tc.Spec.TargetPort.Device = targetPort.(*v1alpha2.NetworkDevice) + case *v1alpha2.UDPTunnel: + if isTargetPortVXLAN { + tc.Spec.TargetPort.VXLAN = targetPort.(*v1alpha2.UDPTunnel) + } else { + tc.Spec.TargetPort.GENEVE = targetPort.(*v1alpha2.UDPTunnel) + } + case *v1alpha2.GRETunnel: + tc.Spec.TargetPort.GRE = targetPort.(*v1alpha2.GRETunnel) + case *v1alpha2.ERSPANTunnel: + tc.Spec.TargetPort.ERSPAN = targetPort.(*v1alpha2.ERSPANTunnel) + } + + switch returnPort.(type) { + case *v1alpha2.OVSInternalPort: + tc.Spec.ReturnPort.OVSInternal = returnPort.(*v1alpha2.OVSInternalPort) + case *v1alpha2.NetworkDevice: + tc.Spec.ReturnPort.Device = returnPort.(*v1alpha2.NetworkDevice) + default: + tc.Spec.ReturnPort = nil + } + return tc +} + +func generateTrafficControlState(direction v1alpha2.Direction, + action v1alpha2.TrafficControlAction, + targetPortName string, + targetOFPort uint32, + returnPortName string, + ofPorts sets.Int32, + pods sets.String) *trafficControlState { + return &trafficControlState{ + targetPortName: targetPortName, + targetOFPort: targetOFPort, + returnPortName: returnPortName, + action: action, + direction: direction, + ofPorts: ofPorts, + pods: pods, + } +} + +func waitEvents(t *testing.T, expectedEvents int, c *fakeController) { + require.NoError(t, wait.PollImmediate(10*time.Millisecond, time.Second, func() (done bool, err error) { + return c.queue.Len() == expectedEvents, nil + })) +} + +func TestTrafficControlAdd(t *testing.T) { + destinationPort := int32(1234) + vni := int32(1) + greKey := int32(2222) + remoteIP := "1.1.1.1" + erspanDir := int32(1) + erspanHwID := int32(1) + + networkDeviceName := "non-existing-port" + networkDevice := &v1alpha2.NetworkDevice{Name: networkDeviceName} + udpTunnel := &v1alpha2.UDPTunnel{RemoteIP: remoteIP, VNI: &vni, DestinationPort: &destinationPort} + greTunnel := &v1alpha2.GRETunnel{RemoteIP: remoteIP, Key: &greKey} + erspanTunnel := &v1alpha2.ERSPANTunnel{Version: 2, RemoteIP: remoteIP, Dir: &erspanDir, HardwareID: &erspanHwID} + + interfaces := []*interfacestore.InterfaceConfig{ + podInterface1, + podInterface2, + podInterface3, + podInterface4, + } + + testcases := []struct { + name string + tc *v1alpha2.TrafficControl + extraInterfaces []*interfacestore.InterfaceConfig + portToTCBindings map[string]*portToTCBinding + expectedCalls func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + MockOVSCtlClient *ovsctltest.MockOVSCtlClient) + }{ + { + name: "Add TrafficControl with non-existing target port (NetworkDevice)", + tc: generateTrafficControl(tc1Name, nil, labels1, directionIngress, actionMirror, networkDevice, false, nil), + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + mockOVSBridgeClient.EXPECT().CreatePort(networkDeviceName, networkDeviceName, externalIDs) + mockOVSBridgeClient.EXPECT().GetOFPort(networkDeviceName, false) + mockOVSCtlClient.EXPECT().SetPortNoFlood(0) + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, gomock.InAnyOrder([]uint32{pod1OFPort, pod3OFPort}), gomock.Any(), directionIngress, actionMirror) + }, + }, + { + name: "Add TrafficControl with non-existing target port (VXLAN)", + tc: generateTrafficControl(tc1Name, nil, labels1, directionIngress, actionMirror, udpTunnel, true, nil), + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + extraOptions := map[string]interface{}{"key": strconv.Itoa(int(vni)), "dst_port": strconv.Itoa(int(destinationPort))} + + mockOVSBridgeClient.EXPECT().CreateTunnelPortExt(gomock.Any(), ovsconfig.TunnelType(ovsconfig.VXLANTunnel), int32(0), false, "", remoteIP, "", "", extraOptions, externalIDs) + mockOVSBridgeClient.EXPECT().GetOFPort(gomock.Any(), false) + mockOVSCtlClient.EXPECT().SetPortNoFlood(gomock.Any()) + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, gomock.InAnyOrder([]uint32{pod1OFPort, pod3OFPort}), gomock.Any(), directionIngress, actionMirror) + }, + }, + { + name: "Add TrafficControl with non-existing target port (GENEVE)", + tc: generateTrafficControl(tc1Name, nil, labels1, directionIngress, actionMirror, udpTunnel, false, nil), + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + extraOptions := map[string]interface{}{"key": strconv.Itoa(int(vni)), "dst_port": strconv.Itoa(int(destinationPort))} + + mockOVSBridgeClient.EXPECT().CreateTunnelPortExt(gomock.Any(), ovsconfig.TunnelType(ovsconfig.GeneveTunnel), int32(0), false, "", remoteIP, "", "", extraOptions, externalIDs) + mockOVSBridgeClient.EXPECT().GetOFPort(gomock.Any(), false) + mockOVSCtlClient.EXPECT().SetPortNoFlood(gomock.Any()) + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, gomock.InAnyOrder([]uint32{pod1OFPort, pod3OFPort}), gomock.Any(), directionIngress, actionMirror) + }, + }, + { + name: "Add TrafficControl with non-existing target port (GRE)", + tc: generateTrafficControl(tc1Name, nil, labels1, directionIngress, actionMirror, greTunnel, false, nil), + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + extraOptions := map[string]interface{}{"key": strconv.Itoa(int(greKey))} + + mockOVSBridgeClient.EXPECT().CreateTunnelPortExt(gomock.Any(), ovsconfig.TunnelType(ovsconfig.GRETunnel), int32(0), false, "", remoteIP, "", "", extraOptions, externalIDs) + mockOVSBridgeClient.EXPECT().GetOFPort(gomock.Any(), false) + mockOVSCtlClient.EXPECT().SetPortNoFlood(gomock.Any()) + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, gomock.InAnyOrder([]uint32{pod1OFPort, pod3OFPort}), gomock.Any(), directionIngress, actionMirror) + }, + }, + { + name: "Add TrafficControl with non-existing target port (ERSPAN)", + tc: generateTrafficControl(tc1Name, nil, labels1, directionIngress, actionMirror, erspanTunnel, false, nil), + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + extraOptions := map[string]interface{}{"erspan_ver": "2", "erspan_dir": strconv.Itoa(int(erspanDir)), "erspan_hwid": strconv.Itoa(int(erspanHwID))} + + mockOVSBridgeClient.EXPECT().CreateTunnelPortExt(gomock.Any(), ovsconfig.TunnelType(ovsconfig.ERSPANTunnel), int32(0), false, "", remoteIP, "", "", extraOptions, externalIDs) + mockOVSBridgeClient.EXPECT().GetOFPort(gomock.Any(), false) + mockOVSCtlClient.EXPECT().SetPortNoFlood(gomock.Any()) + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, gomock.InAnyOrder([]uint32{pod1OFPort, pod3OFPort}), gomock.Any(), directionIngress, actionMirror) + }, + }, + { + name: "Add TrafficControl with existing target port and return port", + tc: generateTrafficControl(tc1Name, nil, labels1, directionIngress, actionRedirect, targetPort2, false, returnPort2), + extraInterfaces: []*interfacestore.InterfaceConfig{targetInterface2, returnInterface2}, + portToTCBindings: map[string]*portToTCBinding{ + targetPort2Name: {targetInterface2, sets.NewString(tc2Name)}, + returnPort2Name: {returnInterface2, sets.NewString(tc2Name)}, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, gomock.InAnyOrder([]uint32{pod1OFPort, pod3OFPort}), targetPort2OFPort, directionIngress, actionRedirect) + }, + }, + { + name: "Add TrafficControl with only Pod selector", + tc: generateTrafficControl(tc1Name, nil, labels1, directionIngress, actionMirror, targetPort1, false, nil), + extraInterfaces: []*interfacestore.InterfaceConfig{targetInterface1}, + portToTCBindings: map[string]*portToTCBinding{ + targetPort1Name: {targetInterface1, sets.NewString(tc2Name)}, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, gomock.InAnyOrder([]uint32{pod1OFPort, pod3OFPort}), targetPort1OFPort, directionIngress, actionMirror) + }, + }, + { + name: "Add TrafficControl with only Namespace selector", + extraInterfaces: []*interfacestore.InterfaceConfig{targetInterface1}, + portToTCBindings: map[string]*portToTCBinding{ + targetPort1Name: {targetInterface1, sets.NewString(tc2Name)}, + }, + tc: generateTrafficControl(tc1Name, labels1, nil, directionIngress, actionMirror, targetPort1, false, nil), + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, gomock.InAnyOrder([]uint32{pod1OFPort, pod2OFPort}), targetPort1OFPort, directionIngress, actionMirror) + }, + }, + { + name: "Add TrafficControl with Pod selector and Namespace selector", + extraInterfaces: []*interfacestore.InterfaceConfig{targetInterface1}, + portToTCBindings: map[string]*portToTCBinding{ + targetPort1Name: {targetInterface1, sets.NewString(tc2Name)}, + }, + tc: generateTrafficControl(tc1Name, labels1, labels2, directionIngress, actionRedirect, targetPort1, false, nil), + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, []uint32{pod2OFPort}, targetPort1OFPort, directionIngress, actionRedirect) + }, + }, + { + name: "Add TrafficControl with nil Pod selector and nil Namespace selector", + extraInterfaces: []*interfacestore.InterfaceConfig{targetInterface1}, + portToTCBindings: map[string]*portToTCBinding{ + targetPort1Name: {targetInterface1, sets.NewString(tc2Name)}, + }, + tc: generateTrafficControl(tc1Name, nil, nil, directionIngress, actionRedirect, targetPort1, false, nil), + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, nil, targetPort1OFPort, directionIngress, actionRedirect) + }, + }, + { + name: "Add TrafficControl with empty Pod selector and empty Namespace selector", + extraInterfaces: []*interfacestore.InterfaceConfig{targetInterface1}, + portToTCBindings: map[string]*portToTCBinding{ + targetPort1Name: {targetInterface1, sets.NewString(tc2Name)}, + }, + tc: generateTrafficControl(tc1Name, map[string]string{}, map[string]string{}, directionIngress, actionRedirect, targetPort1, false, nil), + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, []uint32{pod1OFPort, pod2OFPort, pod3OFPort, pod4OFPort}, targetPort1OFPort, directionIngress, actionRedirect) + }, + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + + c := newFakeController(t, []runtime.Object{ns1, ns2, pod1, pod2, pod3, pod4}, []runtime.Object{tt.tc}, append(interfaces, tt.extraInterfaces...)) + defer c.mockController.Finish() + + if tt.portToTCBindings != nil { + c.portToTCBindings = tt.portToTCBindings + } + + stopCh := make(chan struct{}) + defer close(stopCh) + + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + go c.localPodInformer.Run(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + + tt.expectedCalls(c.mockOFClient, c.mockOVSBridgeClient, c.mockOVSCtlClient) + assert.NoError(t, c.syncTrafficControl(tt.tc.Name)) + }) + } +} + +func TestTrafficControlUpdate(t *testing.T) { + tc1 := generateTrafficControl(tc1Name, nil, labels1, directionIngress, actionMirror, targetPort1, false, nil) + interfaces := []*interfacestore.InterfaceConfig{ + podInterface1, + podInterface2, + podInterface3, + podInterface4, + targetInterface1, + } + + testcases := []struct { + name string + updatedTrafficControl *v1alpha2.TrafficControl + expectedState *trafficControlState + expectedCalls func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + MockOVSCtlClient *ovsctltest.MockOVSCtlClient) + }{ + { + name: "Update TrafficControl target port (NetworkDevice)", + updatedTrafficControl: generateTrafficControl(tc1Name, nil, labels1, directionIngress, actionMirror, targetPort2, false, nil), + expectedState: generateTrafficControlState(directionIngress, actionMirror, targetPort2Name, 0, "", sets.NewInt32(int32(pod1OFPort), int32(pod3OFPort)), sets.NewString(pod1NN, pod3NN)), + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + mockOVSBridgeClient.EXPECT().DeletePort(gomock.Any()) + mockOVSBridgeClient.EXPECT().CreatePort(targetPort2Name, targetPort2Name, externalIDs) + mockOVSBridgeClient.EXPECT().GetOFPort(targetPort2Name, false) + mockOVSCtlClient.EXPECT().SetPortNoFlood(gomock.Any()) + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, gomock.InAnyOrder([]uint32{pod1OFPort, pod3OFPort}), gomock.Any(), directionIngress, actionMirror) + }, + }, + { + name: "Update TrafficControl action", + updatedTrafficControl: generateTrafficControl(tc1Name, nil, labels1, directionIngress, actionRedirect, targetPort1, false, returnPort1), + expectedState: generateTrafficControlState(directionIngress, actionRedirect, targetPort1Name, targetPort1OFPort, returnPort1Name, sets.NewInt32(int32(pod1OFPort), int32(pod3OFPort)), sets.NewString(pod1NN, pod3NN)), + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + mockOVSBridgeClient.EXPECT().CreatePort(returnPort1Name, returnPort1Name, externalIDs) + mockOVSBridgeClient.EXPECT().GetOFPort(returnPort1Name, false) + mockOVSCtlClient.EXPECT().SetPortNoFlood(gomock.Any()) + mockOFClient.EXPECT().InstallTrafficControlReturnPortFlow(gomock.Any()) + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, gomock.InAnyOrder([]uint32{pod1OFPort, pod3OFPort}), targetPort1OFPort, directionIngress, actionRedirect) + }, + }, + { + name: "Update TrafficControl direction", + updatedTrafficControl: generateTrafficControl(tc1Name, nil, labels1, directionEgress, actionMirror, targetPort1, false, nil), + expectedState: generateTrafficControlState(directionEgress, actionMirror, targetPort1Name, targetPort1OFPort, "", sets.NewInt32(int32(pod1OFPort), int32(pod3OFPort)), sets.NewString(pod1NN, pod3NN)), + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, gomock.InAnyOrder([]uint32{pod1OFPort, pod3OFPort}), targetPort1OFPort, directionEgress, actionMirror) + }, + }, + { + name: "Update TrafficControl Pod selector", + updatedTrafficControl: generateTrafficControl(tc1Name, nil, labels2, directionIngress, actionMirror, targetPort1, false, nil), + expectedState: generateTrafficControlState(directionIngress, actionMirror, targetPort1Name, targetPort1OFPort, "", sets.NewInt32(int32(pod2OFPort), int32(pod4OFPort)), sets.NewString(pod2NN, pod4NN)), + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, gomock.InAnyOrder([]uint32{pod2OFPort, pod4OFPort}), targetPort1OFPort, directionIngress, actionMirror) + }, + }, + { + name: "Update TrafficControl Namespace selector", + updatedTrafficControl: generateTrafficControl(tc1Name, labels2, labels1, directionIngress, actionMirror, targetPort1, false, nil), + expectedState: generateTrafficControlState(directionIngress, actionMirror, targetPort1Name, targetPort1OFPort, "", sets.NewInt32(int32(pod3OFPort)), sets.NewString(pod3NN)), + expectedCalls: func(mockOFClient *openflowtest.MockClient, + mockOVSBridgeClient *ovsconfigtest.MockOVSBridgeClient, + mockOVSCtlClient *ovsctltest.MockOVSCtlClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, []uint32{pod3OFPort}, targetPort1OFPort, directionIngress, actionMirror) + }, + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, []runtime.Object{ns1, ns2, pod1, pod2, pod3, pod4}, []runtime.Object{tc1}, interfaces) + defer c.mockController.Finish() + + stopCh := make(chan struct{}) + defer close(stopCh) + + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + go c.localPodInformer.Run(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + + // Fake the status after TrafficControl tc1 is added. + c.portToTCBindings = map[string]*portToTCBinding{ + targetPort1Name: {targetInterface1, sets.NewString(tc1Name)}, + } + c.tcStates = map[string]*trafficControlState{ + tc1Name: { + targetPortName: targetPort1Name, + targetOFPort: targetPort1OFPort, + action: actionMirror, + direction: directionIngress, + ofPorts: sets.NewInt32(int32(pod1OFPort), int32(pod3OFPort)), + pods: sets.NewString(pod1NN, pod3NN), + }, + } + c.podToTCBindings = map[string]*podToTCBinding{ + pod1NN: {effectiveTC: tc1Name, alternativeTCs: sets.NewString()}, + pod3NN: {effectiveTC: tc1Name, alternativeTCs: sets.NewString()}, + } + + // Ignore the TrafficControl ADD events for TrafficControl tc1. + waitEvents(t, 1, c) + item, _ := c.queue.Get() + c.queue.Done(item) + + _, err := c.crdClient.CrdV1alpha2().TrafficControls().Update(context.TODO(), tt.updatedTrafficControl, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Functions are expected to be called after updating TrafficControl tc1. + tt.expectedCalls(c.mockOFClient, c.mockOVSBridgeClient, c.mockOVSCtlClient) + + time.Sleep(time.Second) + require.NoError(t, c.syncTrafficControl(tc1Name)) + require.Equal(t, tt.expectedState, c.tcStates[tc1Name]) + }) + } +} + +func TestSharedTargetPort(t *testing.T) { + tc1 := generateTrafficControl(tc1Name, nil, labels1, directionIngress, actionMirror, targetPort1, false, nil) + tc2 := generateTrafficControl(tc2Name, nil, labels2, directionIngress, actionMirror, targetPort1, false, nil) + interfaces := []*interfacestore.InterfaceConfig{ + podInterface1, + podInterface2, + podInterface3, + podInterface4, + } + + c := newFakeController(t, []runtime.Object{pod1, pod2, pod3, pod4}, []runtime.Object{tc1, tc2}, interfaces) + defer c.mockController.Finish() + + stopCh := make(chan struct{}) + defer close(stopCh) + + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + go c.localPodInformer.Run(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + + // Target port is expected to be crated if it doesn't exist. + c.mockOVSBridgeClient.EXPECT().CreatePort(targetPort1Name, targetPort1Name, externalIDs) + c.mockOVSBridgeClient.EXPECT().GetOFPort(targetPort1Name, false).Times(1) + c.mockOVSCtlClient.EXPECT().SetPortNoFlood(gomock.Any()) + // Mark flows for TrafficControl tc1 and tc2 are expected to be installed. + c.mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, gomock.InAnyOrder([]uint32{pod1OFPort, pod3OFPort}), gomock.Any(), directionIngress, actionMirror) + c.mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc2Name, gomock.InAnyOrder([]uint32{pod2OFPort, pod4OFPort}), gomock.Any(), directionIngress, actionMirror) + + // Process the TrafficControl ADD events for TrafficControl tc1 and tc2. + waitEvents(t, 2, c) + for i := 0; i < 2; i++ { + item, _ := c.queue.Get() + require.NoError(t, c.syncTrafficControl(item.(string))) + c.queue.Done(item) + } + + // If TrafficControl tc1 is deleted, then TrafficControl tc2 is deleted, the created target port is expected to be + // deleted after delete all TrafficControls using the target port. + s1 := c.mockOFClient.EXPECT().UninstallTrafficControlMarkFlows(tc1Name) + s2 := c.mockOFClient.EXPECT().UninstallTrafficControlMarkFlows(tc2Name) + s3 := c.mockOVSBridgeClient.EXPECT().DeletePort(gomock.Any()) + gomock.InOrder(s1, s2, s3) + + // Delete TrafficControl tc1. + require.NoError(t, c.crdClient.CrdV1alpha2().TrafficControls().Delete(context.TODO(), tc1Name, metav1.DeleteOptions{})) + // Process the TrafficControl DELETE event. + waitEvents(t, 1, c) + item, _ := c.queue.Get() + require.Equal(t, tc1Name, item) + require.NoError(t, c.syncTrafficControl(item.(string))) + c.queue.Done(item) + + // Delete TrafficControl tc2. + require.NoError(t, c.crdClient.CrdV1alpha2().TrafficControls().Delete(context.TODO(), tc2Name, metav1.DeleteOptions{})) + // Process the TrafficControl DELETE event. + waitEvents(t, 1, c) + item, _ = c.queue.Get() + require.Equal(t, tc2Name, item) + require.NoError(t, c.syncTrafficControl(item.(string))) + c.queue.Done(item) +} + +func TestPodUpdateFromCNIServer(t *testing.T) { + tc1 := generateTrafficControl(tc1Name, nil, labels1, directionIngress, actionMirror, targetPort1, false, nil) + + c := newFakeController(t, nil, []runtime.Object{tc1}, []*interfacestore.InterfaceConfig{targetInterface1}) + defer c.mockController.Finish() + + stopCh := make(chan struct{}) + defer close(stopCh) + + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + go c.localPodInformer.Run(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + go c.podUpdateChannel.Run(stopCh) + + // Fake the status after TrafficControl tc1 is added. + c.portToTCBindings = map[string]*portToTCBinding{ + targetPort1Name: {targetInterface1, sets.NewString(tc1Name)}, + } + c.tcStates = map[string]*trafficControlState{ + tc1Name: { + targetPortName: targetPort1Name, + targetOFPort: targetPort1OFPort, + action: actionMirror, + direction: directionIngress, + ofPorts: sets.NewInt32(), + pods: sets.NewString(), + }, + } + + // Ignore the TrafficControl ADD event for TrafficControl tc1. + item, _ := c.queue.Get() + c.queue.Done(item) + + // Create a test Pod applying to the TrafficControl tc1. + _, err := c.client.CoreV1().Pods("ns1").Create(context.TODO(), pod1, metav1.CreateOptions{}) + require.NoError(t, err) + + // Process the TrafficControl event triggered by adding the test Pod. Note that, the interface of the Pod is not ready, + // and corresponding mark flows will not be installed. + waitEvents(t, 1, c) + item, _ = c.queue.Get() + require.Equal(t, tc1Name, item) + require.NoError(t, c.syncTrafficControl(item.(string))) + c.queue.Done(item) + + // After syncing, verify the state of TrafficControl tc1. + expectedState := generateTrafficControlState(directionIngress, actionMirror, targetPort1Name, targetPort1OFPort, "", sets.NewInt32(), sets.NewString(pod1NN)) + require.Equal(t, expectedState, c.tcStates[tc1Name]) + + // Mark flows are expected to be installed after the interface of the Pod is ready. + c.mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, []uint32{pod1OFPort}, targetPort1OFPort, directionIngress, actionMirror) + + // Add the interface information of the test Pod to interface store to mock the interface of the Pod is ready, then + // add an update event to podUpdateChannel to trigger a TrafficControl event. + c.interfaceStore.AddInterface(podInterface1) + ev := types.PodUpdate{PodName: "pod1", PodNamespace: "ns1"} + c.podUpdateChannel.Notify(ev) + + // Process the TrafficControl event triggered by Pod update event from CNI server. + waitEvents(t, 1, c) + item, _ = c.queue.Get() + require.Equal(t, tc1Name, item) + require.NoError(t, c.syncTrafficControl(item.(string))) + c.queue.Done(item) + + // After syncing, verify the state of TrafficControl tc1. + expectedState = generateTrafficControlState(directionIngress, actionMirror, targetPort1Name, targetPort1OFPort, "", sets.NewInt32(int32(pod1OFPort)), sets.NewString(pod1NN)) + require.Equal(t, expectedState, c.tcStates[tc1Name]) +} + +func TestPodLabelsUpdate(t *testing.T) { + tc1 := generateTrafficControl(tc1Name, nil, labels1, directionIngress, actionMirror, targetPort1, false, nil) + tc2 := generateTrafficControl(tc2Name, nil, labels2, directionIngress, actionMirror, targetPort2, false, nil) + tc3 := generateTrafficControl(tc3Name, nil, labels3, directionIngress, actionMirror, targetPort3, false, nil) + interfaces := []*interfacestore.InterfaceConfig{ + podInterface1, + targetInterface1, + targetInterface2, + targetInterface3, + } + labels12 := map[string]string{"app1": "foo1", "app2": "foo2"} + labels13 := map[string]string{"app1": "foo1", "app3": "foo3"} + labels23 := map[string]string{"app2": "foo2", "app3": "foo3"} + testPod := newPod("ns1", "pod1", "fakeNode", labels12) + testPodNN := k8s.NamespacedName("ns1", "pod1") + + testcases := []struct { + name string + updatedPod *v1.Pod + eventsTriggeredByPodLabelsUpdate int + eventsTriggeredByPodLabelsUpdateOrder []interface{} + eventsTriggeredByPodEffectiveTCUpdate int + expectedPodBinding *podToTCBinding + expectedCalls func(mockOFClient *openflowtest.MockClient) + }{ + { + name: "Update Pod labels to match none TrafficControl", + updatedPod: newPod("ns1", "pod1", "fakeNode", nil), + eventsTriggeredByPodLabelsUpdate: 2, + expectedPodBinding: nil, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, nil, targetPort1OFPort, directionIngress, actionMirror) + }, + }, + { + name: "Update Pod labels to match matching only TrafficControl tc1", + updatedPod: newPod("ns1", "pod1", "fakeNode", labels1), + eventsTriggeredByPodLabelsUpdate: 1, + expectedPodBinding: &podToTCBinding{effectiveTC: tc1Name, alternativeTCs: sets.NewString()}, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + }, + }, + { + name: "SUpdate Pod labels to match only TrafficControl tc2", + updatedPod: newPod("ns1", "pod1", "fakeNode", labels2), + eventsTriggeredByPodLabelsUpdate: 1, + eventsTriggeredByPodEffectiveTCUpdate: 1, + expectedPodBinding: &podToTCBinding{effectiveTC: tc2Name, alternativeTCs: sets.NewString()}, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, nil, targetPort1OFPort, directionIngress, actionMirror) + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc2Name, []uint32{pod1OFPort}, targetPort2OFPort, directionIngress, actionMirror) + }, + }, + { + name: "Update Pod labels to match TrafficControl tc2 (effective), tc3 (alternative)", + updatedPod: newPod("ns1", "pod1", "fakeNode", labels23), + eventsTriggeredByPodLabelsUpdate: 2, + eventsTriggeredByPodLabelsUpdateOrder: []interface{}{tc1Name, tc3Name}, + eventsTriggeredByPodEffectiveTCUpdate: 1, + expectedPodBinding: &podToTCBinding{effectiveTC: tc2Name, alternativeTCs: sets.NewString(tc3Name)}, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, nil, targetPort1OFPort, directionIngress, actionMirror) + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc2Name, []uint32{pod1OFPort}, targetPort2OFPort, directionIngress, actionMirror) + }, + }, + { + name: "Update Pod labels to match TrafficControl tc1 (effective), tc3 (alternative)", + updatedPod: newPod("ns1", "pod1", "fakeNode", labels13), + eventsTriggeredByPodLabelsUpdate: 2, + expectedPodBinding: &podToTCBinding{effectiveTC: tc1Name, alternativeTCs: sets.NewString(tc3Name)}, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + }, + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, []runtime.Object{testPod}, []runtime.Object{tc1, tc2, tc3}, interfaces) + defer c.mockController.Finish() + + stopCh := make(chan struct{}) + defer close(stopCh) + + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + go c.localPodInformer.Run(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + + // Fake the status after TrafficControl tc1, tc2 and tc3 is added. TrafficControl tc1 is the effective + // TrafficControl of the Pod, and tc2 is the alternative TrafficControl of the Pod. + c.portToTCBindings = map[string]*portToTCBinding{ + targetPort1Name: {targetInterface1, sets.NewString(tc1Name)}, + targetPort2Name: {targetInterface2, sets.NewString(tc2Name)}, + targetPort3Name: {targetInterface3, sets.NewString(tc3Name)}, + } + c.tcStates = map[string]*trafficControlState{ + tc1Name: { + targetPortName: targetPort1Name, + targetOFPort: targetPort1OFPort, + action: actionMirror, + direction: directionIngress, + ofPorts: sets.NewInt32(int32(pod1OFPort)), + pods: sets.NewString(pod1NN), + }, + tc2Name: { + targetPortName: targetPort2Name, + targetOFPort: targetPort2OFPort, + action: actionMirror, + direction: directionIngress, + ofPorts: sets.NewInt32(), + pods: sets.NewString(pod1NN), + }, + tc3Name: { + targetPortName: targetPort3Name, + targetOFPort: targetPort3OFPort, + action: actionMirror, + direction: directionIngress, + ofPorts: sets.NewInt32(), + pods: sets.NewString(), + }, + } + c.podToTCBindings = map[string]*podToTCBinding{ + pod1NN: {effectiveTC: tc1Name, alternativeTCs: sets.NewString(tc2Name)}, + } + + // Ignore the TrafficControl ADD events for TrafficControl tc1, tc2 and tc3. + waitEvents(t, 3, c) + for i := 0; i < 3; i++ { + item, _ := c.queue.Get() + c.queue.Done(item) + } + + // Functions are expected to be called after updating the labels of the Pod. + tt.expectedCalls(c.mockOFClient) + + // Update the labels of the Pod. + _, err := c.client.CoreV1().Pods("ns1").Update(context.TODO(), tt.updatedPod, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Updating the labels of the Pod will trigger events for all affected TrafficControls, but the order of the + // events is random. To make sure the test work as expected (TrafficControl tc2 is promoted to the effective + // TrafficControl of the Pod), we need to rearrange the order of events. + if len(tt.eventsTriggeredByPodLabelsUpdateOrder) != 0 { + waitEvents(t, tt.eventsTriggeredByPodLabelsUpdate, c) + var events []interface{} + for i := 0; i < tt.eventsTriggeredByPodLabelsUpdate; i++ { + item, _ := c.queue.Get() + events = append(events, item) + c.queue.Done(item) + } + require.ElementsMatch(t, tt.eventsTriggeredByPodLabelsUpdateOrder, events) + for _, event := range tt.eventsTriggeredByPodLabelsUpdateOrder { + c.queue.Add(event) + } + } + + // Process the events of TrafficControls triggered by updating the labels of the Pod. + waitEvents(t, tt.eventsTriggeredByPodLabelsUpdate, c) + for i := 0; i < tt.eventsTriggeredByPodLabelsUpdate; i++ { + item, _ := c.queue.Get() + require.NoError(t, c.syncTrafficControl(item.(string))) + c.queue.Done(item) + } + + // Event can be also triggered by updating the effective TrafficControl of a Pod. + if tt.eventsTriggeredByPodEffectiveTCUpdate > 0 { + waitEvents(t, tt.eventsTriggeredByPodEffectiveTCUpdate, c) + for i := 0; i < tt.eventsTriggeredByPodEffectiveTCUpdate; i++ { + item, _ := c.queue.Get() + require.NoError(t, c.syncTrafficControl(item.(string))) + c.queue.Done(item) + } + } + + // check the binding information of the Pod. + require.Equal(t, tt.expectedPodBinding, c.podToTCBindings[testPodNN]) + }) + } +} + +func TestNamespaceLabelsUpdate(t *testing.T) { + tc1 := generateTrafficControl(tc1Name, labels1, nil, directionIngress, actionMirror, targetPort1, false, nil) + tc2 := generateTrafficControl(tc2Name, labels2, nil, directionIngress, actionMirror, targetPort2, false, nil) + tc3 := generateTrafficControl(tc3Name, labels3, nil, directionIngress, actionMirror, targetPort3, false, nil) + interfaces := []*interfacestore.InterfaceConfig{ + podInterface1, + targetInterface1, + targetInterface2, + targetInterface3, + } + labels12 := map[string]string{"app1": "foo1", "app2": "foo2"} + labels13 := map[string]string{"app1": "foo1", "app3": "foo3"} + labels23 := map[string]string{"app2": "foo2", "app3": "foo3"} + testPod := newPod("ns1", "pod1", "fakeNode", map[string]string{}) + testNS := newNamespace("ns1", labels12) + testPodNN := k8s.NamespacedName("ns1", "pod1") + + testcases := []struct { + name string + updatedNS *v1.Namespace + eventsTriggeredByNSLabelsUpdate int + eventsTriggeredByNSLabelsUpdateOrder []interface{} + eventsTriggeredByPodEffectiveTCUpdate int + expectedPodBinding *podToTCBinding + expectedCalls func(mockOFClient *openflowtest.MockClient) + }{ + { + name: "Update Namespace labels to match none TrafficControl", + updatedNS: newNamespace("ns1", nil), + eventsTriggeredByNSLabelsUpdate: 2, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, nil, targetPort1OFPort, directionIngress, actionMirror) + }, + }, + { + name: "Update Pod labels to match only TrafficControl tc1", + updatedNS: newNamespace("ns1", labels1), + eventsTriggeredByNSLabelsUpdate: 1, + expectedPodBinding: &podToTCBinding{effectiveTC: tc1Name, alternativeTCs: sets.NewString()}, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + }, + }, + { + name: "Update Pod labels to match only TrafficControl tc2", + updatedNS: newNamespace("ns1", labels2), + eventsTriggeredByNSLabelsUpdate: 1, + eventsTriggeredByPodEffectiveTCUpdate: 1, + expectedPodBinding: &podToTCBinding{effectiveTC: tc2Name, alternativeTCs: sets.NewString()}, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, nil, targetPort1OFPort, directionIngress, actionMirror) + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc2Name, []uint32{pod1OFPort}, targetPort2OFPort, directionIngress, actionMirror) + }, + }, + { + name: "Update Pod labels to match TrafficControl tc2 (effective), tc3 (alternative)", + updatedNS: newNamespace("ns1", labels23), + eventsTriggeredByNSLabelsUpdate: 2, + eventsTriggeredByNSLabelsUpdateOrder: []interface{}{tc1Name, tc3Name}, + eventsTriggeredByPodEffectiveTCUpdate: 1, + expectedPodBinding: &podToTCBinding{effectiveTC: tc2Name, alternativeTCs: sets.NewString(tc3Name)}, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, nil, targetPort1OFPort, directionIngress, actionMirror) + mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc2Name, []uint32{pod1OFPort}, targetPort2OFPort, directionIngress, actionMirror) + }, + }, + { + name: "Update Pod labels to match TrafficControl tc1 (effective), tc3 (alternative)", + updatedNS: newNamespace("ns1", labels13), + eventsTriggeredByNSLabelsUpdate: 2, + expectedPodBinding: &podToTCBinding{effectiveTC: tc1Name, alternativeTCs: sets.NewString(tc3Name)}, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + }, + }, + } + + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, []runtime.Object{testNS, testPod}, []runtime.Object{tc1, tc2, tc3}, interfaces) + defer c.mockController.Finish() + + stopCh := make(chan struct{}) + defer close(stopCh) + + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + go c.localPodInformer.Run(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + + // Fake the status after TrafficControl tc1, tc2 and tc3 is added. TrafficControl tc1 is the effective + // TrafficControl of the Pod, and tc2 is the alternative TrafficControl of the Pod. + c.portToTCBindings = map[string]*portToTCBinding{ + targetPort1Name: {targetInterface1, sets.NewString(tc1Name)}, + targetPort2Name: {targetInterface2, sets.NewString(tc2Name)}, + targetPort3Name: {targetInterface3, sets.NewString(tc3Name)}, + } + c.tcStates = map[string]*trafficControlState{ + tc1Name: { + targetPortName: targetPort1Name, + targetOFPort: targetPort1OFPort, + action: actionMirror, + direction: directionIngress, + ofPorts: sets.NewInt32(int32(pod1OFPort)), + pods: sets.NewString(pod1NN), + }, + tc2Name: { + targetPortName: targetPort2Name, + targetOFPort: targetPort2OFPort, + action: actionMirror, + direction: directionIngress, + ofPorts: sets.NewInt32(), + pods: sets.NewString(pod1NN), + }, + tc3Name: { + targetPortName: targetPort3Name, + targetOFPort: targetPort3OFPort, + action: actionMirror, + direction: directionIngress, + ofPorts: sets.NewInt32(), + pods: sets.NewString(), + }, + } + c.podToTCBindings = map[string]*podToTCBinding{ + pod1NN: {effectiveTC: tc1Name, alternativeTCs: sets.NewString(tc2Name)}, + } + + // Ignore the TrafficControl ADD events for TrafficControl tc1, tc2 and tc3. + waitEvents(t, 3, c) + for i := 0; i < 3; i++ { + item, _ := c.queue.Get() + c.queue.Done(item) + } + + // Functions are expected to be called after updating the labels of the Namespace. + tt.expectedCalls(c.mockOFClient) + + // Update the labels of the Namespace. + _, err := c.client.CoreV1().Namespaces().Update(context.TODO(), tt.updatedNS, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Updating the labels of the Namespace will trigger events for all affected TrafficControls, but the order of + // the events is random. To make sure the test work as expected (TrafficControl tc2 is promoted to the effective + // TrafficControl of the Pod in Namespace in ns1), we need to rearrange the order of events. + if len(tt.eventsTriggeredByNSLabelsUpdateOrder) != 0 { + waitEvents(t, tt.eventsTriggeredByNSLabelsUpdate, c) + var events []interface{} + for i := 0; i < tt.eventsTriggeredByNSLabelsUpdate; i++ { + item, _ := c.queue.Get() + events = append(events, item) + c.queue.Done(item) + } + require.ElementsMatch(t, tt.eventsTriggeredByNSLabelsUpdateOrder, events) + for _, event := range tt.eventsTriggeredByNSLabelsUpdateOrder { + c.queue.Add(event) + } + } + + // Process the events of TrafficControls triggered by updating the labels of the Namespace. + waitEvents(t, tt.eventsTriggeredByNSLabelsUpdate, c) + for i := 0; i < tt.eventsTriggeredByNSLabelsUpdate; i++ { + item, _ := c.queue.Get() + require.NoError(t, c.syncTrafficControl(item.(string))) + c.queue.Done(item) + } + + // Event can be also triggered by updating the effective TrafficControl of a Pod. + if tt.eventsTriggeredByPodEffectiveTCUpdate > 0 { + waitEvents(t, tt.eventsTriggeredByPodEffectiveTCUpdate, c) + for i := 0; i < tt.eventsTriggeredByPodEffectiveTCUpdate; i++ { + item, _ := c.queue.Get() + require.NoError(t, c.syncTrafficControl(item.(string))) + c.queue.Done(item) + } + } + + // check the binding information of the Pod. + require.Equal(t, tt.expectedPodBinding, c.podToTCBindings[testPodNN]) + }) + } +} + +func TestPodDelete(t *testing.T) { + tc1 := generateTrafficControl(tc1Name, nil, labels1, directionIngress, actionMirror, targetPort1, false, nil) + tc2 := generateTrafficControl(tc2Name, nil, labels1, directionIngress, actionMirror, targetPort2, false, nil) + tc3 := generateTrafficControl(tc3Name, nil, labels1, directionIngress, actionMirror, targetPort3, false, nil) + interfaces := []*interfacestore.InterfaceConfig{ + podInterface1, + podInterface3, + targetInterface1, + targetInterface2, + targetInterface3, + } + + c := newFakeController(t, []runtime.Object{pod1, pod3}, []runtime.Object{tc1, tc2, tc3}, interfaces) + defer c.mockController.Finish() + + stopCh := make(chan struct{}) + defer close(stopCh) + + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + go c.localPodInformer.Run(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + + // Fake the status after TrafficControl tc1, tc2 and tc3 is added. TrafficControl tc1 is the effective + // TrafficControl of the Pod, and tc2, tc3 is the alternative TrafficControl of the Pods. + c.portToTCBindings = map[string]*portToTCBinding{ + targetPort1Name: {targetInterface1, sets.NewString(tc1Name)}, + targetPort2Name: {targetInterface2, sets.NewString(tc2Name)}, + targetPort3Name: {targetInterface3, sets.NewString(tc3Name)}, + } + c.tcStates = map[string]*trafficControlState{ + tc1Name: { + targetPortName: targetPort1Name, + targetOFPort: targetPort1OFPort, + action: actionMirror, + direction: directionIngress, + ofPorts: sets.NewInt32(int32(pod1OFPort), int32(pod3OFPort)), + pods: sets.NewString(pod1NN, pod3NN), + }, + tc2Name: { + targetPortName: targetPort2Name, + targetOFPort: targetPort2OFPort, + action: actionMirror, + direction: directionIngress, + ofPorts: sets.NewInt32(), + pods: sets.NewString(pod1NN, pod3NN), + }, + tc3Name: { + targetPortName: targetPort3Name, + targetOFPort: targetPort3OFPort, + action: actionMirror, + direction: directionIngress, + ofPorts: sets.NewInt32(), + pods: sets.NewString(pod1NN, pod3NN), + }, + } + c.podToTCBindings = map[string]*podToTCBinding{ + pod1NN: {effectiveTC: tc1Name, alternativeTCs: sets.NewString(tc2Name, tc3Name)}, + pod3NN: {effectiveTC: tc1Name, alternativeTCs: sets.NewString(tc2Name, tc3Name)}, + } + + // Ignore the TrafficControl ADD events for TrafficControl tc1, tc2 and tc3. + waitEvents(t, 3, c) + for i := 0; i < 3; i++ { + item, _ := c.queue.Get() + c.queue.Done(item) + } + + c.mockOFClient.EXPECT().InstallTrafficControlMarkFlows(tc1Name, []uint32{pod3OFPort}, targetPort1OFPort, directionIngress, actionMirror) + expectedPod3Binding := &podToTCBinding{ + effectiveTC: tc1Name, + alternativeTCs: sets.NewString(tc2Name, tc3Name), + } + + // Delete Pod pod1. + require.NoError(t, c.client.CoreV1().Pods(pod1.Namespace).Delete(context.TODO(), pod1.Name, metav1.DeleteOptions{})) + + // Process the TrafficControl events triggered by deleting Pod pod1. + waitEvents(t, 3, c) + for i := 0; i < 3; i++ { + item, _ := c.queue.Get() + require.NoError(t, c.syncTrafficControl(item.(string))) + c.queue.Done(item) + } + + // Check the binding information of Pod pod1, pod3 and TrafficControl. + _, exists := c.podToTCBindings[pod1NN] + require.Equal(t, false, exists) + require.Equal(t, expectedPod3Binding, c.podToTCBindings[pod3NN]) +} + +func int32Ptr(i int32) *int32 { + j := i + return &j +} + +func TestGenTunnelPortName(t *testing.T) { + testcases := []struct { + name string + ports []*v1alpha2.TrafficControlPort + expectedName string + }{ + { + name: "VXLAN", + ports: []*v1alpha2.TrafficControlPort{ + { + VXLAN: &v1alpha2.UDPTunnel{ + RemoteIP: "1.1.1.1", + }, + }, + { + VXLAN: &v1alpha2.UDPTunnel{ + RemoteIP: "1.1.1.1", + DestinationPort: int32Ptr(4789), + }, + }, + { + VXLAN: &v1alpha2.UDPTunnel{ + RemoteIP: "1.1.1.1", + VNI: int32Ptr(0), + }, + }, + { + VXLAN: &v1alpha2.UDPTunnel{ + RemoteIP: "1.1.1.1", + DestinationPort: int32Ptr(4789), + VNI: int32Ptr(0), + }, + }, + }, + expectedName: "vxlan-cb3ab8", + }, + { + name: "GENEVE", + ports: []*v1alpha2.TrafficControlPort{ + { + GENEVE: &v1alpha2.UDPTunnel{ + RemoteIP: "1.1.1.1", + }, + }, + { + GENEVE: &v1alpha2.UDPTunnel{ + RemoteIP: "1.1.1.1", + DestinationPort: int32Ptr(6081), + }, + }, + { + GENEVE: &v1alpha2.UDPTunnel{ + RemoteIP: "1.1.1.1", + VNI: int32Ptr(0), + }, + }, + { + GENEVE: &v1alpha2.UDPTunnel{ + RemoteIP: "1.1.1.1", + DestinationPort: int32Ptr(6081), + VNI: int32Ptr(0), + }, + }, + }, + expectedName: "geneve-e17764", + }, + { + name: "GRE", + ports: []*v1alpha2.TrafficControlPort{ + { + GRE: &v1alpha2.GRETunnel{ + RemoteIP: "1.1.1.1", + }, + }, + { + GRE: &v1alpha2.GRETunnel{ + RemoteIP: "1.1.1.1", + Key: int32Ptr(0), + }, + }, + }, + expectedName: "gre-b2d3bd", + }, + { + name: "ERSPAN", + ports: []*v1alpha2.TrafficControlPort{ + { + ERSPAN: &v1alpha2.ERSPANTunnel{ + RemoteIP: "1.1.1.1", + Version: 1, + }, + }, + { + ERSPAN: &v1alpha2.ERSPANTunnel{ + RemoteIP: "1.1.1.1", + Version: 1, + }, + }, + { + ERSPAN: &v1alpha2.ERSPANTunnel{ + RemoteIP: "1.1.1.1", + Version: 1, + SessionID: int32Ptr(0), + }, + }, + { + ERSPAN: &v1alpha2.ERSPANTunnel{ + RemoteIP: "1.1.1.1", + Version: 1, + SessionID: int32Ptr(0), + Index: int32Ptr(0), + Dir: int32Ptr(0), + HardwareID: int32Ptr(0), + }, + }, + }, + expectedName: "erspan-9de667", + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + for _, port := range tt.ports { + var gotName string + switch { + case port.VXLAN != nil: + gotName = genVXLANPortName(port.VXLAN) + case port.GENEVE != nil: + gotName = genGENEVEPortName(port.GENEVE) + case port.GRE != nil: + gotName = genGREPortName(port.GRE) + case port.ERSPAN != nil: + gotName = genERSPANPortName(port.ERSPAN) + } + assert.Equal(t, tt.expectedName, gotName) + } + }) + } +} diff --git a/pkg/agent/interfacestore/types.go b/pkg/agent/interfacestore/types.go index dd4131398ba..fa838a1167a 100644 --- a/pkg/agent/interfacestore/types.go +++ b/pkg/agent/interfacestore/types.go @@ -33,6 +33,8 @@ const ( UplinkInterface // HostInterface is used to mark current interface is for host HostInterface + // TrafficControlInterface is used to mark current interface is for traffic control port + TrafficControlInterface AntreaInterfaceTypeKey = "antrea-type" AntreaGateway = "gateway" @@ -40,6 +42,7 @@ const ( AntreaTunnel = "tunnel" AntreaUplink = "uplink" AntreaHost = "host" + AntreaTrafficControl = "traffic-control" AntreaUnset = "" ) @@ -68,6 +71,8 @@ type TunnelInterfaceConfig struct { LocalIP net.IP // IP address of the remote Node. RemoteIP net.IP + // Destination port of the remote Node. + DestinationPort int32 // CommonName of the remote Name for certificate based authentication. RemoteName string // Pre-shard key for authentication. @@ -162,6 +167,11 @@ func NewHostInterface(hostInterfaceName string) *InterfaceConfig { return &InterfaceConfig{InterfaceName: hostInterfaceName, Type: HostInterface} } +func NewTrafficControlInterface(interfaceName string) *InterfaceConfig { + trafficControlConfig := &InterfaceConfig{InterfaceName: interfaceName, Type: TrafficControlInterface} + return trafficControlConfig +} + // TODO: remove this method after IPv4/IPv6 dual-stack is supported completely. func (c *InterfaceConfig) GetIPv4Addr() net.IP { return util.GetIPv4Addr(c.IPs) diff --git a/pkg/apis/crd/v1alpha2/register.go b/pkg/apis/crd/v1alpha2/register.go index 6de40e17e8c..0ea568f710f 100644 --- a/pkg/apis/crd/v1alpha2/register.go +++ b/pkg/apis/crd/v1alpha2/register.go @@ -54,6 +54,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ExternalIPPoolList{}, &IPPool{}, &IPPoolList{}, + &TrafficControl{}, + &TrafficControlList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) diff --git a/pkg/ovs/ovsconfig/interfaces.go b/pkg/ovs/ovsconfig/interfaces.go index 0f6907f5061..2bea4390d00 100644 --- a/pkg/ovs/ovsconfig/interfaces.go +++ b/pkg/ovs/ovsconfig/interfaces.go @@ -23,6 +23,7 @@ const ( VXLANTunnel = "vxlan" GRETunnel = "gre" STTTunnel = "stt" + ERSPANTunnel = "erspan" OVSDatapathSystem OVSDatapathType = "system" OVSDatapathNetdev OVSDatapathType = "netdev" @@ -40,7 +41,7 @@ type OVSBridgeClient interface { CreateAccessPort(name, ifDev string, externalIDs map[string]interface{}, vlanID uint16) (string, Error) CreateInternalPort(name string, ofPortRequest int32, externalIDs map[string]interface{}) (string, Error) CreateTunnelPort(name string, tunnelType TunnelType, ofPortRequest int32) (string, Error) - CreateTunnelPortExt(name string, tunnelType TunnelType, ofPortRequest int32, csum bool, localIP string, remoteIP string, remoteName string, psk string, externalIDs map[string]interface{}) (string, Error) + CreateTunnelPortExt(name string, tunnelType TunnelType, ofPortRequest int32, csum bool, localIP string, remoteIP string, remoteName string, psk string, extraOptions, externalIDs map[string]interface{}) (string, Error) CreateUplinkPort(name string, ofPortRequest int32, externalIDs map[string]interface{}) (string, Error) DeletePort(portUUID string) Error DeletePorts(portUUIDList []string) Error diff --git a/pkg/ovs/ovsconfig/ovs_client.go b/pkg/ovs/ovsconfig/ovs_client.go index 03e09027ad9..fafc55fed4d 100644 --- a/pkg/ovs/ovsconfig/ovs_client.go +++ b/pkg/ovs/ovsconfig/ovs_client.go @@ -373,7 +373,7 @@ func (br *OVSBridge) CreateInternalPort(name string, ofPortRequest int32, extern // the bridge. // If ofPortRequest is not zero, it will be passed to the OVS port creation. func (br *OVSBridge) CreateTunnelPort(name string, tunnelType TunnelType, ofPortRequest int32) (string, Error) { - return br.createTunnelPort(name, tunnelType, ofPortRequest, false, "", "", "", "", nil) + return br.createTunnelPort(name, tunnelType, ofPortRequest, false, "", "", "", "", nil, nil) } // CreateTunnelPortExt creates a tunnel port with the specified name and type @@ -384,7 +384,7 @@ func (br *OVSBridge) CreateTunnelPort(name string, tunnelType TunnelType, ofPort // psk is for the pre-shared key of IPsec ESP tunnel. If it is not empty, it // will be set to the tunnel port interface options. Flow based IPsec tunnel is // not supported, so remoteIP must be provided too when psk is not empty. -// If externalIDs is not nill, the IDs in it will be added to the port's +// If externalIDs is not nil, the IDs in it will be added to the port's // external_ids. func (br *OVSBridge) CreateTunnelPortExt( name string, @@ -395,6 +395,7 @@ func (br *OVSBridge) CreateTunnelPortExt( remoteIP string, remoteName string, psk string, + extraOptions map[string]interface{}, externalIDs map[string]interface{}) (string, Error) { if psk != "" && remoteIP == "" { return "", newInvalidArgumentsError("IPsec tunnel can not be flow based. remoteIP must be set") @@ -402,7 +403,7 @@ func (br *OVSBridge) CreateTunnelPortExt( if psk != "" && remoteName != "" { return "", newInvalidArgumentsError("Cannot set psk and remoteName together") } - return br.createTunnelPort(name, tunnelType, ofPortRequest, csum, localIP, remoteIP, remoteName, psk, externalIDs) + return br.createTunnelPort(name, tunnelType, ofPortRequest, csum, localIP, remoteIP, remoteName, psk, extraOptions, externalIDs) } func (br *OVSBridge) createTunnelPort( @@ -414,16 +415,25 @@ func (br *OVSBridge) createTunnelPort( remoteIP string, remoteName string, psk string, + extraOptions map[string]interface{}, externalIDs map[string]interface{}) (string, Error) { - if tunnelType != VXLANTunnel && tunnelType != GeneveTunnel && tunnelType != GRETunnel && tunnelType != STTTunnel { + if tunnelType != VXLANTunnel && + tunnelType != GeneveTunnel && + tunnelType != GRETunnel && + tunnelType != STTTunnel && + tunnelType != ERSPANTunnel { return "", newInvalidArgumentsError("unsupported tunnel type: " + string(tunnelType)) } if ofPortRequest < 0 || ofPortRequest > ofPortRequestMax { return "", newInvalidArgumentsError(fmt.Sprint("invalid ofPortRequest value: ", ofPortRequest)) } - options := make(map[string]interface{}, 3) + options := make(map[string]interface{}) + for k, v := range extraOptions { + options[k] = v + } + if remoteIP != "" { options["remote_ip"] = remoteIP } else { @@ -506,7 +516,6 @@ func ParseTunnelInterfaceOptions(portData *OVSPortData) (net.IP, net.IP, string, if localIPStr, ok = portData.Options["local_ip"]; ok { localIP = net.ParseIP(localIPStr) } - psk = portData.Options["psk"] if csumStr, ok := portData.Options["csum"]; ok { csum, _ = strconv.ParseBool(csumStr) diff --git a/pkg/ovs/ovsconfig/testing/mock_ovsconfig.go b/pkg/ovs/ovsconfig/testing/mock_ovsconfig.go index 35b6dee874f..fe0ae94fe16 100644 --- a/pkg/ovs/ovsconfig/testing/mock_ovsconfig.go +++ b/pkg/ovs/ovsconfig/testing/mock_ovsconfig.go @@ -151,18 +151,18 @@ func (mr *MockOVSBridgeClientMockRecorder) CreateTunnelPort(arg0, arg1, arg2 int } // CreateTunnelPortExt mocks base method -func (m *MockOVSBridgeClient) CreateTunnelPortExt(arg0 string, arg1 ovsconfig.TunnelType, arg2 int32, arg3 bool, arg4, arg5, arg6, arg7 string, arg8 map[string]interface{}) (string, ovsconfig.Error) { +func (m *MockOVSBridgeClient) CreateTunnelPortExt(arg0 string, arg1 ovsconfig.TunnelType, arg2 int32, arg3 bool, arg4, arg5, arg6, arg7 string, arg8, arg9 map[string]interface{}) (string, ovsconfig.Error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateTunnelPortExt", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8) + ret := m.ctrl.Call(m, "CreateTunnelPortExt", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9) ret0, _ := ret[0].(string) ret1, _ := ret[1].(ovsconfig.Error) return ret0, ret1 } // CreateTunnelPortExt indicates an expected call of CreateTunnelPortExt -func (mr *MockOVSBridgeClientMockRecorder) CreateTunnelPortExt(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 interface{}) *gomock.Call { +func (mr *MockOVSBridgeClientMockRecorder) CreateTunnelPortExt(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTunnelPortExt", reflect.TypeOf((*MockOVSBridgeClient)(nil).CreateTunnelPortExt), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTunnelPortExt", reflect.TypeOf((*MockOVSBridgeClient)(nil).CreateTunnelPortExt), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9) } // CreateUplinkPort mocks base method diff --git a/test/e2e/trafficcontrol_test.go b/test/e2e/trafficcontrol_test.go new file mode 100644 index 00000000000..fffd0b60124 --- /dev/null +++ b/test/e2e/trafficcontrol_test.go @@ -0,0 +1,338 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "fmt" + "net" + "regexp" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "antrea.io/antrea/pkg/apis/crd/v1alpha2" + agentconfig "antrea.io/antrea/pkg/config/agent" +) + +type trafficControlTestConfig struct { + nodeName string + podName string + podIPs map[corev1.IPFamily]string + collectorPodName string + collectorPodIPs map[corev1.IPFamily]string +} + +var ( + vni = int32(1) + dstVXLANPort = int32(1111) + labels = map[string]string{"tc-e2e": "agnhost"} + + tcTestConfig = trafficControlTestConfig{ + podName: "test-tc-pod", + podIPs: map[corev1.IPFamily]string{}, + collectorPodName: "test-packets-collector-pod", + collectorPodIPs: map[corev1.IPFamily]string{}, + } +) + +func TestTrafficControl(t *testing.T) { + skipIfHasWindowsNodes(t) + + data, err := setupTest(t) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + defer teardownTest(t, data) + + ac := func(config *agentconfig.AgentConfig) { + config.FeatureGates["TrafficControl"] = true + } + + if err = data.mutateAntreaConfigMap(nil, ac, true, true); err != nil { + t.Fatalf("Failed to enable TrafficControl feature: %v", err) + } + + tcTestConfig.nodeName = controlPlaneNodeName() + + createTrafficControlTestPod(t, data, tcTestConfig.podName) + createTrafficControlPacketsCollectorPod(t, data, tcTestConfig.collectorPodName) + + t.Run("TestMirrorToRemote", func(t *testing.T) { testMirrorToRemote(t, data) }) + t.Run("TestMirrorToLocal", func(t *testing.T) { testMirrorToLocal(t, data) }) + t.Run("TestRedirectToLocal", func(t *testing.T) { testRedirectToLocal(t, data) }) +} + +func createTrafficControlTestPod(t *testing.T, data *TestData, podName string) { + args := []string{"netexec", "--http-port=8080"} + ports := []corev1.ContainerPort{ + { + Name: "http", + ContainerPort: 8080, + Protocol: corev1.ProtocolTCP, + }, + } + mutateLabels := func(pod *corev1.Pod) { + for k, v := range labels { + pod.Labels[k] = v + } + } + + require.NoError(t, data.createPodOnNode(podName, data.testNamespace, tcTestConfig.nodeName, agnhostImage, []string{}, args, nil, ports, false, mutateLabels)) + ips, err := data.podWaitForIPs(defaultTimeout, podName, data.testNamespace) + if err != nil { + t.Fatalf("Error when waiting for IP for Pod '%s': %v", podName, err) + } + require.NoError(t, data.podWaitForRunning(defaultTimeout, podName, data.testNamespace)) + + if ips.ipv4 != nil { + tcTestConfig.podIPs[corev1.IPv4Protocol] = ips.ipv4.String() + } + if ips.ipv6 != nil { + tcTestConfig.podIPs[corev1.IPv6Protocol] = ips.ipv6.String() + } +} + +func createTrafficControlPacketsCollectorPod(t *testing.T, data *TestData, podName string) { + require.NoError(t, data.createPodOnNode(podName, data.testNamespace, tcTestConfig.nodeName, agnhostImage, []string{"sleep"}, []string{"3600"}, nil, nil, false, func(pod *corev1.Pod) { + privileged := true + pod.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{Privileged: &privileged} + })) + ips, err := data.podWaitForIPs(defaultTimeout, podName, data.testNamespace) + if err != nil { + t.Fatalf("Error when waiting for IP for Pod '%s': %v", podName, err) + } + require.NoError(t, data.podWaitForRunning(defaultTimeout, podName, data.testNamespace)) + + if ips.ipv4 != nil { + tcTestConfig.collectorPodIPs[corev1.IPv4Protocol] = ips.ipv4.String() + } + if ips.ipv6 != nil { + tcTestConfig.collectorPodIPs[corev1.IPv6Protocol] = ips.ipv6.String() + } +} + +func (data *TestData) createTrafficControl(t *testing.T, + generateName string, + matchExpressions []metav1.LabelSelectorRequirement, + matchLabels map[string]string, + direction v1alpha2.Direction, + action v1alpha2.TrafficControlAction, + targetPort interface{}, + isTargetPortVXLAN bool, + returnPort interface{}) *v1alpha2.TrafficControl { + tc := &v1alpha2.TrafficControl{ + ObjectMeta: metav1.ObjectMeta{GenerateName: generateName}, + Spec: v1alpha2.TrafficControlSpec{ + AppliedTo: v1alpha2.AppliedTo{ + PodSelector: &metav1.LabelSelector{ + MatchExpressions: matchExpressions, + MatchLabels: matchLabels, + }, + }, + Direction: direction, + Action: action, + ReturnPort: &v1alpha2.TrafficControlPort{}, + }, + } + switch targetPort.(type) { + case *v1alpha2.OVSInternalPort: + tc.Spec.TargetPort.OVSInternal = targetPort.(*v1alpha2.OVSInternalPort) + case *v1alpha2.NetworkDevice: + tc.Spec.TargetPort.Device = targetPort.(*v1alpha2.NetworkDevice) + case *v1alpha2.UDPTunnel: + if isTargetPortVXLAN { + tc.Spec.TargetPort.VXLAN = targetPort.(*v1alpha2.UDPTunnel) + } else { + tc.Spec.TargetPort.GENEVE = targetPort.(*v1alpha2.UDPTunnel) + } + case *v1alpha2.GRETunnel: + tc.Spec.TargetPort.GRE = targetPort.(*v1alpha2.GRETunnel) + case *v1alpha2.ERSPANTunnel: + tc.Spec.TargetPort.ERSPAN = targetPort.(*v1alpha2.ERSPANTunnel) + } + + switch returnPort.(type) { + case *v1alpha2.OVSInternalPort: + tc.Spec.ReturnPort.OVSInternal = returnPort.(*v1alpha2.OVSInternalPort) + case *v1alpha2.NetworkDevice: + tc.Spec.ReturnPort.Device = returnPort.(*v1alpha2.NetworkDevice) + default: + tc.Spec.ReturnPort = nil + } + + tc, err := data.crdClient.CrdV1alpha2().TrafficControls().Create(context.TODO(), tc, metav1.CreateOptions{}) + require.NoError(t, err, "Failed to create TrafficControl") + return tc +} + +func countPackets(t *testing.T, data *TestData, portName string, isPortOnNode bool, podName string, direction string) int { + var stdout, stderr string + var err error + cmd := fmt.Sprintf("ip -s link show %s", portName) + if !isPortOnNode { + stdout, stderr, err = data.RunCommandFromPod(data.testNamespace, podName, agnhostContainerName, []string{"sh", "-c", cmd}) + } else { + _, stdout, stderr, err = data.RunCommandOnNode(tcTestConfig.nodeName, cmd) + } + require.NoError(t, err) + require.Equal(t, "", stderr) + + re := regexp.MustCompile(fmt.Sprintf(`(?s)%s.*?\d+.*?(\d+)`, direction)) + matches := re.FindStringSubmatch(stdout) + require.Equal(t, 2, len(matches)) + packets, _ := strconv.Atoi(matches[1]) + + return packets +} + +func abs(a, b int) int { + if a > b { + return a - b + } + return b - a +} + +func verifyMirroredPackets(t *testing.T, data *TestData, portName string, isPortOnNode bool) { + // Get the number of received packets on the interface for receiving mirrored packets before testing mirroring. + receivedPacketsBefore := countPackets(t, data, portName, isPortOnNode, tcTestConfig.collectorPodName, "RX") + + icmpRequests := 100 + for _, ip := range tcTestConfig.podIPs { + cmd := fmt.Sprintf("ping %s -i 0.01 -c %d", ip, icmpRequests) + t.Logf("Generate packets for mirroring with command '%s'", cmd) + data.RunCommandFromPod(data.testNamespace, tcTestConfig.collectorPodName, agnhostContainerName, []string{"sh", "-c", cmd}) + } + + mirroredPackets := icmpRequests * 2 * len(tcTestConfig.podIPs) + t.Logf("The total number of mirrored packets is %d", mirroredPackets) + + // Get the number of received packets on the interface for receiving mirrored packet. + receivedPackets := countPackets(t, data, portName, isPortOnNode, tcTestConfig.collectorPodName, "RX") - receivedPacketsBefore + t.Logf("The actual number of received packets is %d", receivedPackets) + + // The difference in the number of packets mirrored and received should be within 10. + require.GreaterOrEqual(t, 10, abs(receivedPackets, mirroredPackets)) +} + +func testMirrorToRemote(t *testing.T, data *TestData) { + skipIfNotIPv4Cluster(t) + + // Create a VXLAN tunnel on the collector Pod to receive mirrored packets. + tunnelPeer := "vxlan0" + cmd := fmt.Sprintf(`ip link add %[3]s type vxlan id %[1]d dstport %[2]d dev eth0 && \ +ip link set %[3]s up`, vni, dstVXLANPort, tunnelPeer) + _, _, err := data.RunCommandFromPod(data.testNamespace, tcTestConfig.collectorPodName, agnhostContainerName, []string{"sh", "-c", cmd}) + require.NoError(t, err, "Failed to create VXLAN tunnel") + + // Create a TrafficControl whose target port is VXLAN. + targetPort := &v1alpha2.UDPTunnel{RemoteIP: tcTestConfig.collectorPodIPs[corev1.IPv4Protocol], VNI: &vni, DestinationPort: &dstVXLANPort} + + tc := data.createTrafficControl(t, "tc-", nil, labels, v1alpha2.DirectionBoth, v1alpha2.ActionMirror, targetPort, true, nil) + defer data.crdClient.CrdV1alpha2().TrafficControls().Delete(context.TODO(), tc.Name, metav1.DeleteOptions{}) + // Wait flows of the TrafficControl to be realized. + time.Sleep(time.Second) + + // Verify the number of mirrored packets. + verifyMirroredPackets(t, data, tunnelPeer, false) +} + +func testMirrorToLocal(t *testing.T, data *TestData) { + // Create a TrafficControl whose target port is OVS internal port. + portName := "test-port" + targetPort := &v1alpha2.OVSInternalPort{Name: portName} + tc := data.createTrafficControl(t, "tc-", nil, labels, v1alpha2.DirectionBoth, v1alpha2.ActionMirror, targetPort, false, nil) + defer data.crdClient.CrdV1alpha2().TrafficControls().Delete(context.TODO(), tc.Name, metav1.DeleteOptions{}) + // Wait flows of the TrafficControl to be realized. + time.Sleep(time.Second) + + // Verify the number of mirrored packets. + verifyMirroredPackets(t, data, portName, true) +} + +func verifyRedirectedPackets(t *testing.T, data *TestData, targetPort, returnPort string) { + // Get the number of received and sent packets on test Pod before testing redirect. + packetsBefore := countPackets(t, data, "eth0", false, tcTestConfig.podName, "RX") + + countPackets(t, data, "eth0", false, tcTestConfig.podName, "TX") + // Get the number of received packets on target port before testing redirect. Note that, received packets on veth pair + // are counted as TX. + packetsOnTargetPortBefore := countPackets(t, data, targetPort, true, "", "TX") + // Get the number of sent packets on return port before testing redirect. Note that, sent packets on veth pair are + // counted as RX. + packetsOnReturnPortBefore := countPackets(t, data, returnPort, true, "", "RX") + + for _, ip := range tcTestConfig.podIPs { + cmd := fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused http://%s/hostname", net.JoinHostPort(ip, "8080")) + t.Logf("Generate packets for redirecting with command '%s'", cmd) + for i := 0; i < 10; i++ { + hostname, _, err := data.RunCommandFromPod(data.testNamespace, tcTestConfig.collectorPodName, agnhostContainerName, []string{"sh", "-c", cmd}) + require.NoError(t, err) + require.Equal(t, tcTestConfig.podName, hostname) + } + } + + // Get the number of redirected packets on test Pod. + packetsOnPod := countPackets(t, data, "eth0", false, tcTestConfig.podName, "RX") + + countPackets(t, data, "eth0", false, tcTestConfig.podName, "TX") - packetsBefore + t.Logf("The total number of redirected packets on test Pod is %d", packetsOnPod) + + // Get the number of received packets on target port after testing redirect. + packetsOnTargetPort := countPackets(t, data, targetPort, true, "", "TX") - packetsOnTargetPortBefore + t.Logf("The actual number of received packets on target port is %d", packetsOnTargetPort) + + // Get the number of sent packets on return port after testing redirect. + packetsOnReturnPort := countPackets(t, data, returnPort, true, "", "RX") - packetsOnReturnPortBefore + t.Logf("The actual number of sent packets on return port is %d", packetsOnReturnPort) + + // The difference in the number of packets redirected and received should be within 10. + require.GreaterOrEqual(t, 10, abs(packetsOnTargetPort, packetsOnPod)) + require.GreaterOrEqual(t, 10, abs(packetsOnReturnPort, packetsOnPod)) + require.GreaterOrEqual(t, 10, abs(packetsOnTargetPort, packetsOnReturnPort)) +} + +func testRedirectToLocal(t *testing.T, data *TestData) { + targetPortName := "target1" + returnPortName := "return1" + tempPodName := "pod-to-create-veth-pair" + cmd := fmt.Sprintf(`ip link del dev %[1]s ; \ +ip link add dev %[1]s type veth peer name %[2]s && \ +ip link set dev %[1]s up && \ +ip link set dev %[2]s up`, targetPortName, returnPortName) + if err := data.createPodOnNode(tempPodName, data.testNamespace, tcTestConfig.nodeName, agnhostImage, []string{"sleep", "3600"}, nil, nil, nil, true, func(pod *corev1.Pod) { + privileged := true + pod.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{Privileged: &privileged} + }); err != nil { + t.Fatalf("Failed to create Pod %s: %v", tempPodName, err) + } + require.NoError(t, data.podWaitForRunning(defaultTimeout, tempPodName, data.testNamespace)) + _, _, err := data.RunCommandFromPod(data.testNamespace, tempPodName, agnhostContainerName, []string{"sh", "-c", cmd}) + require.NoError(t, err) + defer data.RunCommandFromPod(data.testNamespace, tempPodName, agnhostContainerName, []string{"sh", "-c", fmt.Sprintf("ip link del dev %s", targetPortName)}) + + targetPort := &v1alpha2.NetworkDevice{Name: targetPortName} + returnPort := &v1alpha2.NetworkDevice{Name: returnPortName} + + tc := data.createTrafficControl(t, "tc-", nil, labels, v1alpha2.DirectionBoth, v1alpha2.ActionRedirect, targetPort, false, returnPort) + defer data.crdClient.CrdV1alpha2().TrafficControls().Delete(context.TODO(), tc.Name, metav1.DeleteOptions{}) + // Wait flows of TrafficControl to be realized. + time.Sleep(time.Second) + + // Verify the number of redirected packets. + verifyRedirectedPackets(t, data, targetPortName, returnPortName) +} diff --git a/test/integration/ovs/ovs_client_test.go b/test/integration/ovs/ovs_client_test.go index 776bdb621c8..d3e86f87801 100644 --- a/test/integration/ovs/ovs_client_test.go +++ b/test/integration/ovs/ovs_client_test.go @@ -265,7 +265,7 @@ func TestTunnelOptionCsum(t *testing.T) { defer data.teardown(t) name := "vxlan0" - _, err := data.br.CreateTunnelPortExt(name, ovsconfig.VXLANTunnel, ofPortRequest, testCase.initialCsum, "", "", "", "", nil) + _, err := data.br.CreateTunnelPortExt(name, ovsconfig.VXLANTunnel, ofPortRequest, testCase.initialCsum, "", "", "", "", nil, nil) require.Nil(t, err, "Error when creating tunnel port") options, err := data.br.GetInterfaceOptions(name) require.Nil(t, err, "Error when getting interface options")