diff --git a/controller/networks.go b/controller/networks.go deleted file mode 100644 index d9467abd..00000000 --- a/controller/networks.go +++ /dev/null @@ -1,5 +0,0 @@ -package controller - -func Run() error { - return nil -} diff --git a/controller/networks_test.go b/controller/networks_test.go deleted file mode 100644 index 032539ab..00000000 --- a/controller/networks_test.go +++ /dev/null @@ -1,15 +0,0 @@ -package controller_test - -import ( - "testing" - - "github.com/maiqueb/multus-dynamic-networks-controller/controller" -) - -func TestRun(t *testing.T) { - t.Run("networks-controller entry point (placeholder test)", func(t *testing.T) { - if err := controller.Run(); err != nil { - t.Errorf("failed execution of Run(): %v", err) - } - }) -} diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go new file mode 100644 index 00000000..f0cf4604 --- /dev/null +++ b/pkg/controller/pod.go @@ -0,0 +1,310 @@ +package controller + +import ( + "encoding/json" + "fmt" + "reflect" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + v1coreinformerfactory "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + v1corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + nadv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" + nadclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned" + nadinformers "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions" + nadlisterv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1" + + "github.com/maiqueb/multus-dynamic-networks-controller/pkg/annotations" +) + +const maxRetries = 2 + +type DynamicAttachmentRequestType string + +type DynamicAttachmentRequest struct { + PodName string + PodNamespace string + AttachmentNames []nadv1.NetworkSelectionElement + Type DynamicAttachmentRequestType +} + +// PodNetworksController handles the cncf networks annotations update, and +// triggers adding / removing networks from a running pod. +type PodNetworksController struct { + k8sClientSet kubernetes.Interface + arePodsSynched cache.InformerSynced + areNetAttachDefsSynched cache.InformerSynced + podsInformer cache.SharedIndexInformer + netAttachDefInformer cache.SharedIndexInformer + podsLister v1corelisters.PodLister + netAttachDefLister nadlisterv1.NetworkAttachmentDefinitionLister + broadcaster record.EventBroadcaster + recorder record.EventRecorder + workqueue workqueue.RateLimitingInterface + confDir string + nadClientSet nadclient.Interface +} + +// NewPodNetworksController returns new PodNetworksController instance +func NewPodNetworksController( + k8sCoreInformerFactory v1coreinformerfactory.SharedInformerFactory, + nadInformers nadinformers.SharedInformerFactory, + broadcaster record.EventBroadcaster, + recorder record.EventRecorder, + confDir string, + k8sClientSet kubernetes.Interface, + nadClientSet nadclient.Interface, +) (*PodNetworksController, error) { + podInformer := k8sCoreInformerFactory.Core().V1().Pods().Informer() + nadInformer := nadInformers.K8sCniCncfIo().V1().NetworkAttachmentDefinitions().Informer() + + podNetworksController := &PodNetworksController{ + arePodsSynched: podInformer.HasSynced, + areNetAttachDefsSynched: nadInformer.HasSynced, + podsInformer: podInformer, + podsLister: k8sCoreInformerFactory.Core().V1().Pods().Lister(), + netAttachDefInformer: nadInformer, + netAttachDefLister: nadInformers.K8sCniCncfIo().V1().NetworkAttachmentDefinitions().Lister(), + recorder: recorder, + broadcaster: broadcaster, + confDir: confDir, + workqueue: workqueue.NewNamedRateLimitingQueue( + workqueue.DefaultControllerRateLimiter(), + "pod-networks-updates"), + k8sClientSet: k8sClientSet, + nadClientSet: nadClientSet, + } + + podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: podNetworksController.handlePodUpdate, + }) + + return podNetworksController, nil +} + +// Start runs worker thread after performing cache synchronization +func (pnc *PodNetworksController) Start(stopChan <-chan struct{}) { + klog.Infof("starting network controller") + defer pnc.workqueue.ShutDown() + + if ok := cache.WaitForCacheSync(stopChan, pnc.arePodsSynched, pnc.areNetAttachDefsSynched); !ok { + klog.Infof("failed waiting for caches to sync") + } + + go wait.Until(pnc.worker, time.Second, stopChan) + <-stopChan + klog.Infof("shutting down network controller") + return +} + +func (pnc *PodNetworksController) worker() { + for pnc.processNextWorkItem() { + } +} + +func (pnc *PodNetworksController) processNextWorkItem() bool { + queueItem, shouldQuit := pnc.workqueue.Get() + if shouldQuit { + return false + } + defer pnc.workqueue.Done(queueItem) + + dynAttachmentRequest := queueItem.(*DynamicAttachmentRequest) + klog.Infof("extracted request [%v] from the queue", dynAttachmentRequest) + err := pnc.handleDynamicInterfaceRequest(dynAttachmentRequest) + pnc.handleResult(err, dynAttachmentRequest) + + return true +} + +func (pnc *PodNetworksController) handleDynamicInterfaceRequest(dynamicAttachmentRequest *DynamicAttachmentRequest) error { + klog.Infof("handleDynamicInterfaceRequest: read from queue: %v", dynamicAttachmentRequest) + if dynamicAttachmentRequest.Type == "add" { + pod, err := pnc.podsLister.Pods(dynamicAttachmentRequest.PodNamespace).Get(dynamicAttachmentRequest.PodName) + if err != nil { + return err + } + return pnc.addNetworks(dynamicAttachmentRequest.AttachmentNames, pod) + } else if dynamicAttachmentRequest.Type == "remove" { + pod, err := pnc.podsLister.Pods(dynamicAttachmentRequest.PodNamespace).Get(dynamicAttachmentRequest.PodName) + if err != nil { + return err + } + return pnc.removeNetworks(dynamicAttachmentRequest.AttachmentNames, pod) + } else { + klog.Infof("very weird attachment request: %+v", dynamicAttachmentRequest) + } + klog.Infof("handleDynamicInterfaceRequest: exited & successfully processed: %v", dynamicAttachmentRequest) + return nil +} + +func (pnc *PodNetworksController) handleResult(err error, dynamicAttachmentRequest *DynamicAttachmentRequest) { + if err == nil { + pnc.workqueue.Forget(dynamicAttachmentRequest) + return + } + + currentRetries := pnc.workqueue.NumRequeues(dynamicAttachmentRequest) + if currentRetries <= maxRetries { + klog.Errorf("re-queued request for: %v", dynamicAttachmentRequest) + pnc.workqueue.AddRateLimited(dynamicAttachmentRequest) + return + } + + pnc.workqueue.Forget(dynamicAttachmentRequest) +} + +func (pnc *PodNetworksController) handlePodUpdate(oldObj interface{}, newObj interface{}) { + oldPod := oldObj.(*corev1.Pod) + newPod := newObj.(*corev1.Pod) + + const ( + add DynamicAttachmentRequestType = "add" + remove DynamicAttachmentRequestType = "remove" + ) + + if reflect.DeepEqual(oldPod.Annotations, newPod.Annotations) { + return + } + podNamespace := oldPod.GetNamespace() + podName := oldPod.GetName() + klog.V(5).Infof("pod [%s] updated", namespacedName(podNamespace, podName)) + + oldNetworkSelectionElements, err := networkSelectionElements(oldPod.Annotations, podNamespace) + if err != nil { + klog.Errorf("failed to compute the network selection elements from the *old* pod") + return + } + + newNetworkSelectionElements, err := networkSelectionElements(newPod.Annotations, podNamespace) + if err != nil { + klog.Errorf("failed to compute the network selection elements from the *new* pod") + return + } + + toAdd := exclusiveNetworks(newNetworkSelectionElements, oldNetworkSelectionElements) + klog.Infof("%d attachments to add to pod %s", len(toAdd), namespacedName(podNamespace, podName)) + if len(toAdd) > 0 { + pnc.workqueue.Add( + &DynamicAttachmentRequest{ + PodName: podName, + PodNamespace: podNamespace, + AttachmentNames: toAdd, + Type: add, + }) + } + + toRemove := exclusiveNetworks(oldNetworkSelectionElements, newNetworkSelectionElements) + klog.Infof("%d attachments to remove from pod %s", len(toRemove), namespacedName(podNamespace, podName)) + if len(toRemove) > 0 { + pnc.workqueue.Add( + &DynamicAttachmentRequest{ + PodName: podName, + PodNamespace: podNamespace, + AttachmentNames: toRemove, + Type: remove, + }) + } +} + +func namespacedName(podNamespace string, podName string) string { + return fmt.Sprintf("%s/%s", podNamespace, podName) +} + +func (pnc *PodNetworksController) addNetworks(netsToAdd []nadv1.NetworkSelectionElement, pod *corev1.Pod) error { + for _, netToAdd := range netsToAdd { + klog.Infof("network to add: %v", netToAdd) + pnc.Eventf(pod, corev1.EventTypeNormal, "AddedInterface", "add network: %s", netToAdd.Name) + } + + return nil +} + +func (pnc *PodNetworksController) removeNetworks(netsToRemove []nadv1.NetworkSelectionElement, pod *corev1.Pod) error { + for _, netToRemove := range netsToRemove { + klog.Infof("network to add: %v", netToRemove) + pnc.Eventf(pod, corev1.EventTypeNormal, "RemovedInterface", "removed network: %s", netToRemove.Name) + } + + return nil +} + +func networkSelectionElements(podAnnotations map[string]string, podNamespace string) ([]*nadv1.NetworkSelectionElement, error) { + podNetworks, ok := podAnnotations[nadv1.NetworkAttachmentAnnot] + if !ok { + return nil, fmt.Errorf("the pod is missing the \"%s\" annotation on its annotations: %+v", nadv1.NetworkAttachmentAnnot, podAnnotations) + } + podNetworkSelectionElements, err := annotations.ParsePodNetworkAnnotations(podNetworks, podNamespace) + if err != nil { + klog.Errorf("failed to extract the network selection elements: %v", err) + return nil, err + } + return podNetworkSelectionElements, nil +} + +func networkStatus(podAnnotations map[string]string) ([]nadv1.NetworkStatus, error) { + podNetworkstatus, ok := podAnnotations[nadv1.NetworkStatusAnnot] + if !ok { + return nil, fmt.Errorf("the pod is missing the \"%s\" annotation on its annotations: %+v", nadv1.NetworkStatusAnnot, podAnnotations) + } + var netStatus []nadv1.NetworkStatus + if err := json.Unmarshal([]byte(podNetworkstatus), &netStatus); err != nil { + return nil, err + } + + return netStatus, nil +} + +func exclusiveNetworks( + needles []*nadv1.NetworkSelectionElement, + haystack []*nadv1.NetworkSelectionElement) []nadv1.NetworkSelectionElement { + + setOfNeedles := indexNetworkSelectionElements(needles) + haystackSet := indexNetworkSelectionElements(haystack) + + var unmatchedNetworks []nadv1.NetworkSelectionElement + for needleNetName, needle := range setOfNeedles { + if _, ok := haystackSet[needleNetName]; !ok { + unmatchedNetworks = append(unmatchedNetworks, needle) + } + } + return unmatchedNetworks +} + +func indexNetworkSelectionElements(list []*nadv1.NetworkSelectionElement) map[string]nadv1.NetworkSelectionElement { + indexedNetworkSelectionElements := make(map[string]nadv1.NetworkSelectionElement) + for k := range list { + indexedNetworkSelectionElements[networkSelectionElementIndexKey(*list[k])] = *list[k] + } + return indexedNetworkSelectionElements +} + +func networkSelectionElementIndexKey(netSelectionElement nadv1.NetworkSelectionElement) string { + if netSelectionElement.InterfaceRequest != "" { + return fmt.Sprintf( + "%s/%s/%s", + netSelectionElement.Namespace, + netSelectionElement.Name, + netSelectionElement.InterfaceRequest) + } + + return fmt.Sprintf( + "%s/%s", + netSelectionElement.Namespace, + netSelectionElement.Name) +} + +// Eventf puts event into kubernetes events +func (pnc *PodNetworksController) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { + if pnc != nil && pnc.recorder != nil { + pnc.recorder.Eventf(object, eventtype, reason, messageFmt, args...) + } +} diff --git a/pkg/controller/pod_test.go b/pkg/controller/pod_test.go new file mode 100644 index 00000000..580cab24 --- /dev/null +++ b/pkg/controller/pod_test.go @@ -0,0 +1,359 @@ +package controller + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path" + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + v1coreinformerfactory "k8s.io/client-go/informers" + k8sclient "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + + nad "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" + nadclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned" + fakenadclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/fake" + nadinformers "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions" +) + +func TestController(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Dynamic network attachment controller suite") +} + +var _ = Describe("Dynamic Attachment controller", func() { + Context("with access to a proper multus configuration", func() { + var cniConfigDir string + + BeforeEach(func() { + const ( + configFilePermissions = 0755 + multusConfigPath = "00-multus.conf" + ) + + var err error + cniConfigDir, err = ioutil.TempDir("", "multus-config") + Expect(err).ToNot(HaveOccurred()) + Expect(os.MkdirAll(cniConfigDir, configFilePermissions)).To(Succeed()) + Expect(ioutil.WriteFile( + path.Join(cniConfigDir, multusConfigPath), + []byte(dummyMultusConfig()), configFilePermissions)).To(Succeed()) + }) + + AfterEach(func() { + Expect(os.RemoveAll(cniConfigDir)).To(Succeed()) + }) + + Context("with an existing running pod", func() { + const ( + cniVersion = "0.3.0" + namespace = "default" + networkName = "tiny-net" + podName = "tiny-winy-pod" + ) + var ( + eventRecorder *record.FakeRecorder + k8sClient k8sclient.Interface + pod *corev1.Pod + networkToAdd string + stopChannel chan struct{} + ) + + networkStatusNames := func(statuses []nad.NetworkStatus) []string { + var names []string + for _, status := range statuses { + names = append(names, status.Name) + } + return names + } + + BeforeEach(func() { + pod = podSpec(podName, namespace, networkName) + k8sClient = fake.NewSimpleClientset(pod) + networkToAdd = fmt.Sprintf("%s-2", networkName) + nadClient, err := newFakeNetAttachDefClient( + netAttachDef(networkName, namespace, dummyNetSpec(networkName, cniVersion)), + netAttachDef(networkToAdd, namespace, dummyNetSpec(networkToAdd, cniVersion))) + Expect(err).NotTo(HaveOccurred()) + stopChannel = make(chan struct{}) + const maxEvents = 5 + eventRecorder = record.NewFakeRecorder(maxEvents) + Expect(newDummyPodController(k8sClient, nadClient, stopChannel, eventRecorder, "")).NotTo(BeNil()) + Expect(func() []nad.NetworkStatus { + pod, err := k8sClient.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + return nil + } + status, err := networkStatus(pod.Annotations) + if err != nil { + return nil + } + return status + }()).Should( + And( + WithTransform(networkStatusNames, ContainElements(namespacedName(namespace, networkName))), + Not(WithTransform(networkStatusNames, ContainElements(namespacedName(namespace, networkToAdd))))), + ) + }) + + AfterEach(func() { + close(stopChannel) + }) + + When("an attachment is added to the pod's network annotations", func() { + BeforeEach(func() { + var err error + _, err = k8sClient.CoreV1().Pods(namespace).UpdateStatus( + context.TODO(), + updatePodSpec(pod, networkName, networkToAdd), + metav1.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + }) + + It("an `AddedInterface` event is seen in the event recorded ", func() { + expectedEventPayload := fmt.Sprintf("Normal AddedInterface add network: %s", networkToAdd) + Eventually(<-eventRecorder.Events).Should(Equal(expectedEventPayload)) + }) + }) + + When("an attachment is removed from the pod's network annotations", func() { + BeforeEach(func() { + var err error + _, err = k8sClient.CoreV1().Pods(namespace).UpdateStatus( + context.TODO(), + updatePodSpec(pod), + metav1.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + }) + + It("an `RemovedInterface` event is seen in the event recorded ", func() { + expectedEventPayload := fmt.Sprintf("Normal RemovedInterface removed network: %s", networkName) + Eventually(<-eventRecorder.Events).Should(Equal(expectedEventPayload)) + }) + }) + }) + }) +}) + +type dummyPodController struct { + *PodNetworksController + networkCache cache.Store + podCache cache.Store +} + +func newDummyPodController( + k8sClient k8sclient.Interface, + nadClient nadclient.Interface, + stopChannel chan struct{}, + recorder record.EventRecorder, + cniConfigPath string) (*dummyPodController, error) { + + const noResyncPeriod = 0 + netAttachDefInformerFactory := nadinformers.NewSharedInformerFactory(nadClient, noResyncPeriod) + podInformerFactory := v1coreinformerfactory.NewSharedInformerFactory(k8sClient, noResyncPeriod) + + podController, _ := NewPodNetworksController( + podInformerFactory, + netAttachDefInformerFactory, + nil, + recorder, + cniConfigPath, + k8sClient, + nadClient) + + alwaysReady := func() bool { return true } + podController.arePodsSynched = alwaysReady + podController.areNetAttachDefsSynched = alwaysReady + + podInformerFactory.Start(stopChannel) + netAttachDefInformerFactory.Start(stopChannel) + + controller := &dummyPodController{ + PodNetworksController: podController, + networkCache: podController.netAttachDefInformer.GetStore(), + podCache: podController.podsInformer.GetStore(), + } + + if err := controller.initControllerCaches(k8sClient, nadClient); err != nil { + return nil, err + } + go podController.Start(stopChannel) + + return controller, nil +} + +func newFakeNetAttachDefClient(networkAttachments ...nad.NetworkAttachmentDefinition) (nadclient.Interface, error) { + netAttachDefClient := fakenadclient.NewSimpleClientset() + gvr := metav1.GroupVersionResource{ + Group: "k8s.cni.cncf.io", + Version: "v1", + Resource: "network-attachment-definitions", + } + + for _, networkAttachment := range networkAttachments { + if err := netAttachDefClient.Tracker().Create(schema.GroupVersionResource(gvr), &networkAttachment, networkAttachment.GetNamespace()); err != nil { + return nil, err + } + } + return netAttachDefClient, nil +} + +func (dpc *dummyPodController) initControllerCaches(k8sClient k8sclient.Interface, nadClient nadclient.Interface) error { + if err := dpc.synchPods(k8sClient); err != nil { + return err + } + if err := dpc.synchNetworkAttachments(nadClient); err != nil { + return err + } + return nil +} + +func (dpc *dummyPodController) synchNetworkAttachments(netAttachDefClient nadclient.Interface) error { + const allNamespaces = "" + + networkAttachments, err := netAttachDefClient.K8sCniCncfIoV1().NetworkAttachmentDefinitions(allNamespaces).List( + context.TODO(), metav1.ListOptions{}) + if err != nil { + return err + } + for _, network := range networkAttachments.Items { + if err := dpc.networkCache.Add(&network); err != nil { + return err + } + } + return nil +} + +func (dpc *dummyPodController) synchPods(k8sClient k8sclient.Interface) error { + const allNamespaces = "" + + pods, err := k8sClient.CoreV1().Pods(allNamespaces).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return err + } + for _, pod := range pods.Items { + if err := dpc.podCache.Add(&pod); err != nil { + return err + } + } + return nil +} + +func dummyNetSpec(networkName string, cniVersion string) string { + return fmt.Sprintf(`{ + "cniVersion": "%s", + "name": "%s", + "type": "macvlan", + "master": "eth0", + "mode": "bridge" + }`, cniVersion, networkName) +} + +func podSpec(name string, namespace string, networks ...string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Annotations: podNetworkConfig(networks...), + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + ContainerID: name, + }, + }, + }, + } +} + +func netAttachDef(netName string, namespace string, config string) nad.NetworkAttachmentDefinition { + return nad.NetworkAttachmentDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: netName, + Namespace: namespace, + }, + Spec: nad.NetworkAttachmentDefinitionSpec{ + Config: config, + }, + } +} + +func updatePodSpec(pod *corev1.Pod, networkNames ...string) *corev1.Pod { + newPod := pod.DeepCopy() + newPod.Annotations[nad.NetworkAttachmentAnnot] = generateNetworkSelectionAnnotation( + "default", networkNames...) + return newPod +} + +// this should be used when "creating" a new pod - it sets the status. +func podNetworkConfig(networkNames ...string) map[string]string { + return map[string]string{ + nad.NetworkAttachmentAnnot: generateNetworkSelectionAnnotation("default", networkNames...), + nad.NetworkStatusAnnot: podNetworkStatusAnnotations("default", networkNames...), + } +} + +func generateNetworkSelectionAnnotation(namespace string, networkNames ...string) string { + var netSelectionElements []nad.NetworkSelectionElement + for i, networkName := range networkNames { + netSelectionElements = append( + netSelectionElements, + nad.NetworkSelectionElement{ + Name: networkName, + Namespace: namespace, + InterfaceRequest: fmt.Sprintf("net%d", i), + }) + } + if netSelectionElements == nil { + netSelectionElements = make([]nad.NetworkSelectionElement, 0) + } + serelizedNetSelectionElements, err := json.Marshal(netSelectionElements) + if err != nil { + return "" + } + return string(serelizedNetSelectionElements) +} + +func podNetworkStatusAnnotations(namespace string, networkNames ...string) string { + var netStatus []nad.NetworkStatus + for i, networkName := range networkNames { + netStatus = append( + netStatus, + nad.NetworkStatus{ + Name: fmt.Sprintf("%s/%s", namespace, networkName), + Interface: fmt.Sprintf("net%d", i), + }) + } + serelizedNetStatus, err := json.Marshal(netStatus) + if err != nil { + return "" + } + return string(serelizedNetStatus) +} + +func dummyMultusConfig() string { + return `{ + "name": "node-cni-network", + "type": "multus", + "kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml", + "delegates": [{ + "type": "weave-net" + }], + "runtimeConfig": { + "portMappings": [ + {"hostPort": 8080, "containerPort": 80, "protocol": "tcp"} + ] + } +}` +}