Skip to content

Commit

Permalink
controller: implement level driven controller
Browse files Browse the repository at this point in the history
Signed-off-by: Miguel Duarte Barroso <[email protected]>
  • Loading branch information
maiqueb committed Nov 17, 2022
1 parent 681d482 commit 6ff8cef
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 89 deletions.
2 changes: 1 addition & 1 deletion pkg/annotations/dynamic-network-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func AddDynamicIfaceToStatus(currentPod *corev1.Pod, attachmentResults ...Attach
return currentIfaceStatus, nil
}

func DeleteDynamicIfaceFromStatus(currentPod *corev1.Pod, networkSelectionElements ...*nettypes.NetworkSelectionElement) ([]nettypes.NetworkStatus, error) {
func DeleteDynamicIfaceFromStatus(currentPod *corev1.Pod, networkSelectionElements ...nettypes.NetworkSelectionElement) ([]nettypes.NetworkStatus, error) {
currentIfaceStatus, err := podDynamicNetworkStatus(currentPod)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/annotations/dynamic-network-status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ var _ = Describe("NetworkStatusFromResponse", func() {
Expect(
DeleteDynamicIfaceFromStatus(
newPod(podName, namespace, initialNetStatus...),
newNetworkSelectionElementWithIface(networkName, ifaceToRemove, namespace),
*newNetworkSelectionElementWithIface(networkName, ifaceToRemove, namespace),
),
).To(Equal(expectedNetworkStatus))
},
Expand Down
219 changes: 132 additions & 87 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type DynamicAttachmentRequestType string

type DynamicAttachmentRequest struct {
Pod *corev1.Pod
Attachments []*nadv1.NetworkSelectionElement
Attachments []nadv1.NetworkSelectionElement
Type DynamicAttachmentRequestType
PodNetNS string
}
Expand Down Expand Up @@ -137,11 +137,75 @@ func (pnc *PodNetworksController) processNextWorkItem() bool {
return false
}
defer pnc.workqueue.Done(queueItem)
podNamespacedName := queueItem.(*string)
klog.Infof("extracted update request for pod [%s] from the queue", *podNamespacedName)
podNamespace, podName, err := separateNamespaceAndName(*podNamespacedName)
if err != nil {
klog.Errorf("the update key - [%s] - is not in the namespaced name format: %v", *podNamespacedName, err)
return true
}
defer pnc.handleResult(err, podNamespacedName)

var pod *corev1.Pod
pod, err = pnc.podsLister.Pods(podNamespace).Get(podName)
if err != nil {
klog.Errorf("could not access pod from the informer")
return true
}

indexedNetworkSelectionElements := indexPodNetworkSelectionElements(pod)
indexedNetworkStatus := indexNetworkStatus(pod)

netnsPath, err := pnc.netnsPath(pod)
if err != nil {
klog.Errorf("failed to figure out the pod's network namespace: %v", err)
return true
}

var attachmentsToAdd []nadv1.NetworkSelectionElement
for key := range indexedNetworkSelectionElements {
if _, wasFound := indexedNetworkStatus[key]; !wasFound {
attachmentsToAdd = append(attachmentsToAdd, indexedNetworkSelectionElements[key])
}
}

if len(attachmentsToAdd) > 0 {
err = pnc.handleDynamicInterfaceRequest(
&DynamicAttachmentRequest{
Pod: pod,
Attachments: attachmentsToAdd,
Type: add,
PodNetNS: netnsPath,
})
if err != nil {
klog.Errorf("error adding attachments: %v", err)
return true
}
}

dynAttachmentRequest := queueItem.(*DynamicAttachmentRequest)
klog.Infof("extracted request [%v] from the queue", dynAttachmentRequest)
err := pnc.handleDynamicInterfaceRequest(dynAttachmentRequest)
pnc.handleResult(err, dynAttachmentRequest)
var attachmentsToRemove []nadv1.NetworkSelectionElement
for key := range indexedNetworkStatus {
networkNamespace, networkName, _ := separateNamespaceAndName(key)
if _, wasFound := indexedNetworkSelectionElements[key]; !wasFound {
attachmentsToRemove = append(attachmentsToRemove, nadv1.NetworkSelectionElement{
Name: networkName,
Namespace: networkNamespace,
InterfaceRequest: indexedNetworkStatus[key].Interface,
})
}
}
if len(attachmentsToRemove) > 0 {
err = pnc.handleDynamicInterfaceRequest(&DynamicAttachmentRequest{
Pod: pod,
Attachments: attachmentsToRemove,
Type: remove,
PodNetNS: netnsPath,
})
if err != nil {
klog.Errorf("error removing attachments: %v")
return true
}
}

return true
}
Expand All @@ -159,79 +223,34 @@ func (pnc *PodNetworksController) handleDynamicInterfaceRequest(dynamicAttachmen
return nil
}

func (pnc *PodNetworksController) handleResult(err error, dynamicAttachmentRequest *DynamicAttachmentRequest) {
func (pnc *PodNetworksController) handleResult(err error, namespacedPodName *string) {
if err == nil {
pnc.workqueue.Forget(dynamicAttachmentRequest)
pnc.workqueue.Forget(namespacedPodName)
return
}

currentRetries := pnc.workqueue.NumRequeues(dynamicAttachmentRequest)
currentRetries := pnc.workqueue.NumRequeues(namespacedPodName)
if currentRetries <= maxRetries {
klog.Errorf("re-queued request for: %v. Error: %v", dynamicAttachmentRequest, err)
pnc.workqueue.AddRateLimited(dynamicAttachmentRequest)
klog.Errorf("re-queued request for: %s. Error: %v", *namespacedPodName, err)
pnc.workqueue.AddRateLimited(namespacedPodName)
return
}

pnc.workqueue.Forget(dynamicAttachmentRequest)
pnc.workqueue.Forget(namespacedPodName)
}

func (pnc *PodNetworksController) handlePodUpdate(oldObj interface{}, newObj interface{}) {
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)

const (
add DynamicAttachmentRequestType = "add"
remove DynamicAttachmentRequestType = "remove"
)

if !didNetworkSelectionElementsChange(oldPod, newPod) {
return
}
podNamespace := oldPod.GetNamespace()
podName := oldPod.GetName()
klog.V(logging.Debug).Infof("pod [%s] updated", annotations.NamespacedName(podNamespace, podName))

oldNetworkSelectionElements, err := networkSelectionElements(oldPod.Annotations, podNamespace)
if err != nil {
klog.Errorf("failed to compute the network selection elements from the *old* pod")
return
}

newNetworkSelectionElements, err := networkSelectionElements(newPod.Annotations, podNamespace)
if err != nil {
klog.Errorf("failed to compute the network selection elements from the *new* pod")
return
}

toAdd := exclusiveNetworks(newNetworkSelectionElements, oldNetworkSelectionElements)
klog.Infof("%d attachments to add to pod %s", len(toAdd), annotations.NamespacedName(podNamespace, podName))

netnsPath, err := pnc.netnsPath(newPod)
if err != nil {
klog.Errorf("failed to figure out the pod's network namespace: %v", err)
return
}
if len(toAdd) > 0 {
pnc.workqueue.Add(
&DynamicAttachmentRequest{
Pod: newPod,
Attachments: toAdd,
Type: add,
PodNetNS: netnsPath,
})
}
namespacedName := annotations.NamespacedName(oldPod.GetNamespace(), oldPod.GetName())
klog.V(logging.Debug).Infof("pod [%s] updated", namespacedName)

toRemove := exclusiveNetworks(oldNetworkSelectionElements, newNetworkSelectionElements)
klog.Infof("%d attachments to remove from pod %s", len(toRemove), annotations.NamespacedName(podNamespace, podName))
if len(toRemove) > 0 {
pnc.workqueue.Add(
&DynamicAttachmentRequest{
Pod: newPod,
Attachments: toRemove,
Type: remove,
PodNetNS: netnsPath,
})
}
pnc.workqueue.Add(&namespacedName)
}

func (pnc *PodNetworksController) addNetworks(dynamicAttachmentRequest *DynamicAttachmentRequest) error {
Expand Down Expand Up @@ -261,19 +280,19 @@ func (pnc *PodNetworksController) addNetworks(dynamicAttachmentRequest *DynamicA
pod.GetName(),
string(pod.UID),
netAttachDefWithDefaults,
interfaceAttributes(*netToAdd),
interfaceAttributes(netToAdd),
))

if err != nil {
return fmt.Errorf("failed to ADD delegate: %v", err)
}
klog.Infof("response: %v", *response.Result)

attachmentResults = append(attachmentResults, *annotations.NewAttachmentResult(netToAdd, response))
pnc.Eventf(pod, corev1.EventTypeNormal, "AddedInterface", addIfaceEventFormat(pod, netToAdd))
attachmentResults = append(attachmentResults, *annotations.NewAttachmentResult(&netToAdd, response))
pnc.Eventf(pod, corev1.EventTypeNormal, "AddedInterface", addIfaceEventFormat(pod, &netToAdd))
klog.Infof(
"added interface %s to pod %s",
annotations.NamespacedName(netToAdd.Namespace, netToAdd.Name),
networkSelectionElementIndexKey(netToAdd),
annotations.NamespacedName(pod.GetNamespace(), pod.GetName()),
)
}
Expand All @@ -293,7 +312,7 @@ func (pnc *PodNetworksController) addNetworks(dynamicAttachmentRequest *DynamicA
func (pnc *PodNetworksController) removeNetworks(dynamicAttachmentRequest *DynamicAttachmentRequest) error {
pod := dynamicAttachmentRequest.Pod

var removedNets []*nadv1.NetworkSelectionElement
var removedNets []nadv1.NetworkSelectionElement
for i := range dynamicAttachmentRequest.Attachments {
netToRemove := dynamicAttachmentRequest.Attachments[i]
klog.Infof("network to remove: %v", dynamicAttachmentRequest.Attachments[i])
Expand All @@ -318,17 +337,17 @@ func (pnc *PodNetworksController) removeNetworks(dynamicAttachmentRequest *Dynam
pod.GetName(),
string(pod.UID),
netAttachDefWithDefaults,
interfaceAttributes(*netToRemove),
interfaceAttributes(netToRemove),
))
if err != nil {
return fmt.Errorf("failed to remove delegate: %v", err)
}

removedNets = append(removedNets, netToRemove)
pnc.Eventf(pod, corev1.EventTypeNormal, "RemovedInterface", removeIfaceEventFormat(pod, netToRemove))
pnc.Eventf(pod, corev1.EventTypeNormal, "RemovedInterface", removeIfaceEventFormat(pod, &netToRemove))
klog.Infof(
"removed interface %s from pod %s",
annotations.NamespacedName(netToRemove.Namespace, netToRemove.Name),
networkSelectionElementIndexKey(netToRemove),
annotations.NamespacedName(pod.GetNamespace(), pod.GetName()),
)
}
Expand All @@ -348,17 +367,22 @@ func (pnc *PodNetworksController) removeNetworks(dynamicAttachmentRequest *Dynam
return nil
}

func networkSelectionElements(podAnnotations map[string]string, podNamespace string) ([]*nadv1.NetworkSelectionElement, error) {
func networkSelectionElements(podAnnotations map[string]string, podNamespace string) ([]nadv1.NetworkSelectionElement, error) {
podNetworks, ok := podAnnotations[nadv1.NetworkAttachmentAnnot]
if !ok || podNetworks == "" {
return []*nadv1.NetworkSelectionElement{}, nil
return []nadv1.NetworkSelectionElement{}, nil
}
podNetworkSelectionElements, err := annotations.ParsePodNetworkAnnotations(podNetworks, podNamespace)
if err != nil {
klog.Errorf("failed to extract the network selection elements: %v", err)
return nil, err
}
return podNetworkSelectionElements, nil

var currentPodNetworkSelectionElements []nadv1.NetworkSelectionElement
for i := range podNetworkSelectionElements {
currentPodNetworkSelectionElements = append(currentPodNetworkSelectionElements, *podNetworkSelectionElements[i])
}
return currentPodNetworkSelectionElements, nil
}

func networkStatus(podAnnotations map[string]string) ([]nadv1.NetworkStatus, error) {
Expand All @@ -374,27 +398,33 @@ func networkStatus(podAnnotations map[string]string) ([]nadv1.NetworkStatus, err
return netStatus, nil
}

func exclusiveNetworks(
needles []*nadv1.NetworkSelectionElement,
haystack []*nadv1.NetworkSelectionElement) []*nadv1.NetworkSelectionElement {
setOfNeedles := indexNetworkSelectionElements(needles)
haystackSet := indexNetworkSelectionElements(haystack)

var unmatchedNetworks []*nadv1.NetworkSelectionElement
for needleNetName, needle := range setOfNeedles {
if _, ok := haystackSet[needleNetName]; !ok {
unmatchedNetworks = append(unmatchedNetworks, needle)
}
func indexPodNetworkSelectionElements(pod *corev1.Pod) map[string]nadv1.NetworkSelectionElement {
currentPodNetworkSelectionElements, err := networkSelectionElements(pod.Annotations, pod.GetNamespace())
if err != nil {
klog.Errorf("could not read pod's network selection elements: %v", *pod)
return map[string]nadv1.NetworkSelectionElement{}
}
return unmatchedNetworks
indexedNetworkSelectionElements := make(map[string]nadv1.NetworkSelectionElement)
for k := range currentPodNetworkSelectionElements {
netSelectionElement := currentPodNetworkSelectionElements[k]
indexedNetworkSelectionElements[networkSelectionElementIndexKey(netSelectionElement)] = netSelectionElement
}
return indexedNetworkSelectionElements
}

func indexNetworkSelectionElements(list []*nadv1.NetworkSelectionElement) map[string]*nadv1.NetworkSelectionElement {
indexedNetworkSelectionElements := make(map[string]*nadv1.NetworkSelectionElement)
for k := range list {
indexedNetworkSelectionElements[networkSelectionElementIndexKey(*list[k])] = list[k]
func indexNetworkStatus(pod *corev1.Pod) map[string]nadv1.NetworkStatus {
currentPodNetworkStatus, err := networkStatus(pod.Annotations)
if err != nil {
klog.Errorf("could not read pod's network status: %v", *pod)
return map[string]nadv1.NetworkStatus{}
}
return indexedNetworkSelectionElements
indexedNetworkStatus := map[string]nadv1.NetworkStatus{}
for i := range currentPodNetworkStatus {
if !currentPodNetworkStatus[i].Default {
indexedNetworkStatus[networkStatusIndexKey(currentPodNetworkStatus[i])] = currentPodNetworkStatus[i]
}
}
return indexedNetworkStatus
}

func networkSelectionElementIndexKey(netSelectionElement nadv1.NetworkSelectionElement) string {
Expand Down Expand Up @@ -490,3 +520,18 @@ func didNetworkSelectionElementsChange(oldPod *corev1.Pod, newPod *corev1.Pod) b
}
return false
}

func networkStatusIndexKey(networkStatus nadv1.NetworkStatus) string {
return fmt.Sprintf(
"%s/%s",
networkStatus.Name,
networkStatus.Interface)
}

func separateNamespaceAndName(namespacedName string) (namespace string, name string, err error) {
splitNamespacedName := strings.Split(namespacedName, "/")
if len(splitNamespacedName) != 2 && len(splitNamespacedName) != 3 {
return "", "", fmt.Errorf("invalid namespaced name: %s", namespacedName)
}
return splitNamespacedName[0], splitNamespacedName[1], nil
}

0 comments on commit 6ff8cef

Please sign in to comment.