Skip to content

Commit

Permalink
Add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Qiu <[email protected]>
  • Loading branch information
wenqiq committed May 3, 2022
1 parent 31ba4de commit 2bc548a
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 2 deletions.
81 changes: 79 additions & 2 deletions pkg/agent/controller/trafficcontrol/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/hex"
"fmt"
"net"
"reflect"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -61,6 +62,7 @@ type tcState struct {
pods sets.String
targetPort uint32
returnPort uint32
appliedTo v1alpha2.AppliedTo
}

type Controller struct {
Expand Down Expand Up @@ -92,7 +94,8 @@ type Controller struct {

func NewTrafficControlController(ofClient openflow.Client,
interfaceStore interfacestore.InterfaceStore, ovsBridgeClient ovsconfig.OVSBridgeClient,
tcInformer trafficControlinformers.TrafficControlInformer, podInformer cache.SharedIndexInformer, namespaceInformer coreinformers.NamespaceInformer) *Controller {
tcInformer trafficControlinformers.TrafficControlInformer,
podInformer cache.SharedIndexInformer, namespaceInformer coreinformers.NamespaceInformer) *Controller {
c := &Controller{
ofClient: ofClient,
ovsBridgeClient: ovsBridgeClient,
Expand All @@ -115,9 +118,82 @@ func NewTrafficControlController(ofClient openflow.Client,
},
resyncPeriod,
)
c.podInformer.AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addPod,
UpdateFunc: c.updatePod,
DeleteFunc: c.deletePod,
},
resyncPeriod,
)
return c
}

func (c *Controller) matchPod(pod *v1.Pod, to *v1alpha2.AppliedTo) (match bool) {
if to.NamespaceSelector != nil {
namespace, _ := c.namespaceLister.Get(pod.Namespace)
nsSelector, _ := metav1.LabelSelectorAsSelector(to.NamespaceSelector)
if !nsSelector.Matches(labels.Set(namespace.GetLabels())) {
return
}
}

podSelector, _ := metav1.LabelSelectorAsSelector(to.PodSelector)
if !podSelector.Matches(labels.Set(pod.Labels)) {
return
}
return true
}

func (c *Controller) filterAffectedTCsByPod(pod *v1.Pod) sets.String {
c.tcStatesMutex.RLock()
defer c.tcStatesMutex.RUnlock()
affectedTCs := sets.NewString()
for tcName, tcState := range c.tcStates {
if c.matchPod(pod, &tcState.appliedTo) {
affectedTCs.Insert(tcName)
}
}
return affectedTCs
}

func (c *Controller) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
klog.InfoS("Processing local Pod ADD event", "Namespace", pod.Namespace, "Pod", pod.Name, "labels", pod.Labels)
affectedTCs := c.filterAffectedTCsByPod(pod)
for tc := range affectedTCs {
c.queue.Add(tc)
}
return
}

func (c *Controller) updatePod(oldObj interface{}, obj interface{}) {
oldPod := oldObj.(*v1.Pod)
pod := obj.(*v1.Pod)
klog.InfoS("Processing Pod UPDATE event", "Namespace", pod.Namespace, "Pod", pod.Name, "labels", pod.Labels)
if !reflect.DeepEqual(pod.GetLabels(), oldPod.GetLabels()) {
oldPodAffectedTCs := c.filterAffectedTCsByPod(oldPod)
newPodAffectedTCs := c.filterAffectedTCsByPod(pod)
for tc := range oldPodAffectedTCs.Difference(newPodAffectedTCs) {
c.queue.Add(tc)
}
for tc := range newPodAffectedTCs.Difference(oldPodAffectedTCs) {
c.queue.Add(tc)
}
}
return
}

func (c *Controller) deletePod(obj interface{}) {
pod := obj.(*v1.Pod)
klog.InfoS("Processing Pod DELETE event", "Namespace", pod.Namespace, "Pod", pod.Name, "labels", pod.Labels)
affectedTCs := c.filterAffectedTCsByPod(pod)
for tc := range affectedTCs {
c.queue.Add(tc)
}
return
}

func (c *Controller) addTC(obj interface{}) {
tc := obj.(*v1alpha2.TrafficControl)
klog.InfoS("Processing TrafficControl ADD event", "trafficControl", tc.Name, "appliedTo", tc.Spec.AppliedTo)
Expand Down Expand Up @@ -504,6 +580,7 @@ func (c *Controller) syncTrafficControl(tcName string) (err error) {

tcState.targetPort = targetPort
tcState.returnPort = returnPort
tcState.appliedTo = tc.Spec.AppliedTo
}
var pods []*v1.Pod
if pods, err = c.filterPods(&tc.Spec.AppliedTo); err != nil {
Expand All @@ -516,7 +593,7 @@ func (c *Controller) syncTrafficControl(tcName string) (err error) {

var podOfPorts []uint32
for _, pod := range pods {
// TrafficControl does not support HostNetwork Pods.Ignore Pod if it's HostNetwork Pod.
// TrafficControl does not support HostNetwork Pods. Ignore Pod if it's HostNetwork Pod.
if pod.Spec.HostNetwork {
continue
}
Expand Down
53 changes: 53 additions & 0 deletions pkg/agent/controller/trafficcontrol/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ import (
"fmt"
"strconv"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -398,3 +403,51 @@ func TestSyncTrafficControl(t *testing.T) {
})
}
}

func TestTrafficControlControllerPodUpdate(t *testing.T) {
defaultContext := context.TODO()
tc := v1alpha2.TrafficControl{
ObjectMeta: metav1.ObjectMeta{Name: "tc-mirror", UID: "tc-uid"},
Spec: v1alpha2.TrafficControlSpec{
AppliedTo: v1alpha2.AppliedTo{
PodSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": "foo"}}},
Direction: v1alpha2.DirectionIngress,
Action: v1alpha2.ActionMirror,
TargetPort: v1alpha2.TrafficControlPort{Device: &v1alpha2.NetworkDevice{Name: "test-device"}},
}}
c := newFakeController(t, nil, nil)
defer c.mockController.Finish()

stopCh := make(chan struct{})
defer close(stopCh)

go c.localPodInformer.Run(stopCh)

c.crdInformerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)

c.informerFactory.Start(stopCh)
c.informerFactory.WaitForCacheSync(stopCh)

go c.Controller.Run(stopCh)

_, err := c.crdClient.CrdV1alpha2().TrafficControls().Create(defaultContext, &tc, metav1.CreateOptions{})
require.NoError(t, err)

pod1 := newPod("ns1", "pod1", "fakeNode", map[string]string{"app": "foo"})
_, err = c.client.CoreV1().Pods("ns1").Create(defaultContext, pod1, metav1.CreateOptions{})
require.NoError(t, err)

c.mockOFClient.EXPECT().InstallTrafficControlMarkFlows("trafficControl1", []uint32{1}, uint32(3), v1alpha2.DirectionIngress, v1alpha2.ActionMirror)

assert.NoError(t, wait.Poll(100*time.Millisecond, time.Second, func() (done bool, err error) {
return c.Controller.tcStates != nil, nil
}), "wait TCState update")
assert.Equal(t, &tcState{
ofPorts: sets.Int32{},
pods: sets.String{"ns1/pod1": sets.Empty{}},
targetPort: uint32(3),
returnPort: 0,
appliedTo: tc.Spec.AppliedTo,
}, c.Controller.tcStates["trafficControl1"])
}

0 comments on commit 2bc548a

Please sign in to comment.