From e7ce89976bca7821b195bbd47968b63a0eb099f2 Mon Sep 17 00:00:00 2001 From: Mengdie Song Date: Wed, 1 Jun 2022 04:33:55 +0800 Subject: [PATCH] [ExternalNode]Create/Delete AntreaAgentInfo based on ExternalNode (#3757) Antrea controller watches ExternalNode create and delete event. It creates AntreaAgentInfo whose name is the same as ExternalNode name after ExternalNode is created and delete AntreaAgentInfo when ExternalNode is deleted. The change also refactors monitoring part for Node case and lets controller create AntreaAgentInfo for Node too. With this change, for both Node and ExternalNode cases, it is always Antrea controller to create/delete AntreaAgentInfo and it is always Antrea agent to update AntreaAgentInfo. Signed-off-by: Mengdie Song --- .../antrea/templates/agent/clusterrole.yaml | 2 - .../templates/controller/clusterrole.yaml | 1 + build/yamls/antrea-aks.yml | 3 +- build/yamls/antrea-eks.yml | 3 +- build/yamls/antrea-gke.yml | 3 +- build/yamls/antrea-ipsec.yml | 3 +- build/yamls/antrea.yml | 3 +- cmd/antrea-controller/controller.go | 3 +- pkg/monitor/agent.go | 27 +- pkg/monitor/controller.go | 257 +++++++++++++++--- 10 files changed, 233 insertions(+), 72 deletions(-) diff --git a/build/charts/antrea/templates/agent/clusterrole.yaml b/build/charts/antrea/templates/agent/clusterrole.yaml index 10e847a669b..0a5a504e0e5 100644 --- a/build/charts/antrea/templates/agent/clusterrole.yaml +++ b/build/charts/antrea/templates/agent/clusterrole.yaml @@ -57,9 +57,7 @@ rules: - antreaagentinfos verbs: - get - - create - update - - delete - apiGroups: - controlplane.antrea.io resources: diff --git a/build/charts/antrea/templates/controller/clusterrole.yaml b/build/charts/antrea/templates/controller/clusterrole.yaml index e9a86b199c2..998b76e9040 100644 --- a/build/charts/antrea/templates/controller/clusterrole.yaml +++ b/build/charts/antrea/templates/controller/clusterrole.yaml @@ -143,6 +143,7 @@ rules: - antreaagentinfos verbs: - list + - create - delete - apiGroups: - crd.antrea.io diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index d511f7fc4f1..2411fa12045 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -2940,9 +2940,7 @@ rules: - antreaagentinfos verbs: - get - - create - update - - delete - apiGroups: - controlplane.antrea.io resources: @@ -3287,6 +3285,7 @@ rules: - antreaagentinfos verbs: - list + - create - delete - apiGroups: - crd.antrea.io diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index d40c47c3f7c..82034f2b3bb 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -2940,9 +2940,7 @@ rules: - antreaagentinfos verbs: - get - - create - update - - delete - apiGroups: - controlplane.antrea.io resources: @@ -3287,6 +3285,7 @@ rules: - antreaagentinfos verbs: - list + - create - delete - apiGroups: - crd.antrea.io diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 02f4b02b937..ef08638b74e 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -2940,9 +2940,7 @@ rules: - antreaagentinfos verbs: - get - - create - update - - delete - apiGroups: - controlplane.antrea.io resources: @@ -3287,6 +3285,7 @@ rules: - antreaagentinfos verbs: - list + - create - delete - apiGroups: - crd.antrea.io diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 4f1431eef76..468b39407f2 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -2953,9 +2953,7 @@ rules: - antreaagentinfos verbs: - get - - create - update - - delete - apiGroups: - controlplane.antrea.io resources: @@ -3300,6 +3298,7 @@ rules: - antreaagentinfos verbs: - list + - create - delete - apiGroups: - crd.antrea.io diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index fb60f5b759f..8837b614606 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -2940,9 +2940,7 @@ rules: - antreaagentinfos verbs: - get - - create - update - - delete - apiGroups: - controlplane.antrea.io resources: @@ -3287,6 +3285,7 @@ rules: - antreaagentinfos verbs: - list + - create - delete - apiGroups: - crd.antrea.io diff --git a/cmd/antrea-controller/controller.go b/cmd/antrea-controller/controller.go index 1d5afab27da..5cc6c6bf04d 100644 --- a/cmd/antrea-controller/controller.go +++ b/cmd/antrea-controller/controller.go @@ -172,7 +172,8 @@ func run(o *Options) error { controllerQuerier := querier.NewControllerQuerier(networkPolicyController, o.config.APIPort) - controllerMonitor := monitor.NewControllerMonitor(crdClient, nodeInformer, controllerQuerier) + externalNodeEnabled := features.DefaultFeatureGate.Enabled(features.ExternalNode) + controllerMonitor := monitor.NewControllerMonitor(crdClient, nodeInformer, externalNodeInformer, controllerQuerier, externalNodeEnabled) var egressController *egress.EgressController var externalIPPoolController *externalippool.ExternalIPPoolController diff --git a/pkg/monitor/agent.go b/pkg/monitor/agent.go index 5edd369c1f9..dba77aac562 100644 --- a/pkg/monitor/agent.go +++ b/pkg/monitor/agent.go @@ -18,7 +18,6 @@ import ( "context" "time" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" @@ -59,30 +58,20 @@ func (monitor *agentMonitor) syncAgentCRD() { if monitor.agentCRD, err = monitor.updateAgentCRD(true); err == nil { return } - klog.Errorf("Failed to partially update agent monitoring CRD: %v", err) + klog.ErrorS(err, "Failed to partially update agent monitoring CRD") monitor.agentCRD = nil } monitor.agentCRD, err = monitor.getAgentCRD() - - if errors.IsNotFound(err) { - monitor.agentCRD, err = monitor.createAgentCRD() - if err != nil { - klog.Errorf("Failed to create agent monitoring CRD: %v", err) - monitor.agentCRD = nil - } - return - } - if err != nil { - klog.Errorf("Failed to get agent monitoring CRD: %v", err) + klog.ErrorS(err, "Failed to get agent monitoring CRD") monitor.agentCRD = nil return } monitor.agentCRD, err = monitor.updateAgentCRD(false) if err != nil { - klog.Errorf("Failed to entirely update agent monitoring CRD: %v", err) + klog.ErrorS(err, "Failed to entirely update agent monitoring CRD") monitor.agentCRD = nil } } @@ -91,18 +80,10 @@ func (monitor *agentMonitor) syncAgentCRD() { // So when the pod restarts, it will update this monitoring CRD instead of creating a new one. func (monitor *agentMonitor) getAgentCRD() (*v1beta1.AntreaAgentInfo, error) { crdName := monitor.querier.GetNodeConfig().Name - klog.V(2).Infof("Getting agent monitoring CRD %+v", crdName) + klog.V(2).InfoS("Getting agent monitoring CRD", "name", crdName) return monitor.client.CrdV1beta1().AntreaAgentInfos().Get(context.TODO(), crdName, metav1.GetOptions{}) } -// createAgentCRD creates a new agent CRD. -func (monitor *agentMonitor) createAgentCRD() (*v1beta1.AntreaAgentInfo, error) { - agentCRD := new(v1beta1.AntreaAgentInfo) - monitor.querier.GetAgentInfo(agentCRD, false) - klog.V(2).Infof("Creating agent monitoring CRD %+v", agentCRD) - return monitor.client.CrdV1beta1().AntreaAgentInfos().Create(context.TODO(), agentCRD, metav1.CreateOptions{}) -} - // updateAgentCRD updates the monitoring CRD. func (monitor *agentMonitor) updateAgentCRD(partial bool) (*v1beta1.AntreaAgentInfo, error) { monitor.querier.GetAgentInfo(monitor.agentCRD, partial) diff --git a/pkg/monitor/controller.go b/pkg/monitor/controller.go index 5a2a5a9e78d..46bcc78431f 100644 --- a/pkg/monitor/controller.go +++ b/pkg/monitor/controller.go @@ -21,27 +21,55 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" "antrea.io/antrea/pkg/apis/crd/v1beta1" clientset "antrea.io/antrea/pkg/client/clientset/versioned" + externalnodeinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" + externalnodelisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" controllerquerier "antrea.io/antrea/pkg/controller/querier" ) const ( crdName = "antrea-controller" controllerName = "AntreaControllerMonitor" + // How long to wait before retrying the processing of a Node/ExternalNode change. + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second + // Default number of workers processing a Node/ExternalNode change. + defaultWorkers = 4 +) + +var ( + keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc + splitKeyFunc = cache.SplitMetaNamespaceKey ) type controllerMonitor struct { client clientset.Interface nodeInformer coreinformers.NodeInformer + nodeLister corelisters.NodeLister // nodeListerSynced is a function which returns true if the node shared informer has been synced at least once. nodeListerSynced cache.InformerSynced - querier controllerquerier.ControllerQuerier + + externalNodeInformer externalnodeinformers.ExternalNodeInformer + externalNodeLister externalnodelisters.ExternalNodeLister + externalNodeListerSynced cache.InformerSynced + + externalNodeEnabled bool + + nodeQueue workqueue.RateLimitingInterface + externalNodeQueue workqueue.RateLimitingInterface + + querier controllerquerier.ControllerQuerier // controllerCRD is the desired state of controller monitoring CRD which controllerMonitor expects. controllerCRD *v1beta1.AntreaControllerInfo } @@ -50,37 +78,67 @@ type controllerMonitor struct { func NewControllerMonitor( client clientset.Interface, nodeInformer coreinformers.NodeInformer, + externalNodeInformer externalnodeinformers.ExternalNodeInformer, querier controllerquerier.ControllerQuerier, + externalNodeEnabled bool, ) *controllerMonitor { m := &controllerMonitor{ - client: client, - nodeInformer: nodeInformer, - nodeListerSynced: nodeInformer.Informer().HasSynced, - querier: querier, - controllerCRD: nil, + client: client, + nodeInformer: nodeInformer, + nodeLister: nodeInformer.Lister(), + nodeListerSynced: nodeInformer.Informer().HasSynced, + nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), + querier: querier, + controllerCRD: nil, + externalNodeEnabled: externalNodeEnabled, } nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: nil, + AddFunc: m.enqueueNode, UpdateFunc: nil, - DeleteFunc: m.deleteStaleAgentCRD, + DeleteFunc: m.enqueueNode, }) + // Register Informer and add handlers for ExternalNode events only if the feature is enabled. + if externalNodeEnabled { + m.externalNodeInformer = externalNodeInformer + m.externalNodeLister = externalNodeInformer.Lister() + m.externalNodeListerSynced = externalNodeInformer.Informer().HasSynced + m.externalNodeQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "externalNode") + externalNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: m.enqueueExternalNode, + UpdateFunc: nil, + DeleteFunc: m.enqueueExternalNode, + }) + } + return m } // Run creates AntreaControllerInfo CRD first after controller is running. // Then updates AntreaControllerInfo CRD every 60 seconds if there is any change. func (monitor *controllerMonitor) Run(stopCh <-chan struct{}) { - klog.Infof("Starting %s", controllerName) - defer klog.Infof("Shutting down %s", controllerName) + klog.InfoS("Starting", "controllerName", controllerName) + defer klog.InfoS("Shutting down", "controllerName", controllerName) - if !cache.WaitForNamedCacheSync(controllerName, stopCh, monitor.nodeListerSynced) { + cacheSyncs := []cache.InformerSynced{monitor.nodeListerSynced} + // Only wait for externalNodeListerSynced when ExternalNode feature is enabled. + if monitor.externalNodeEnabled { + cacheSyncs = append(cacheSyncs, monitor.externalNodeListerSynced) + } + if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) { return } monitor.deleteStaleAgentCRDs() // Sync controller monitoring CRD every minute util stopCh is closed. - wait.Until(monitor.syncControllerCRD, time.Minute, stopCh) + go wait.Until(monitor.syncControllerCRD, time.Minute, stopCh) + + for i := 0; i < defaultWorkers; i++ { + go wait.Until(monitor.nodeWorker, time.Second, stopCh) + if monitor.externalNodeEnabled { + go wait.Until(monitor.externalNodeWorker, time.Second, stopCh) + } + } } func (monitor *controllerMonitor) syncControllerCRD() { @@ -89,7 +147,7 @@ func (monitor *controllerMonitor) syncControllerCRD() { if monitor.controllerCRD, err = monitor.updateControllerCRD(true); err == nil { return } - klog.Errorf("Failed to partially update controller monitoring CRD: %v", err) + klog.ErrorS(err, "Failed to partially update controller monitoring CRD") monitor.controllerCRD = nil } @@ -98,21 +156,21 @@ func (monitor *controllerMonitor) syncControllerCRD() { if errors.IsNotFound(err) { monitor.controllerCRD, err = monitor.createControllerCRD(crdName) if err != nil { - klog.Errorf("Failed to create controller monitoring CRD: %v", err) + klog.ErrorS(err, "Failed to create controller monitoring CRD") monitor.controllerCRD = nil } return } if err != nil { - klog.Errorf("Failed to get controller monitoring CRD: %v", err) + klog.ErrorS(err, "Failed to get controller monitoring CRD") monitor.controllerCRD = nil return } monitor.controllerCRD, err = monitor.updateControllerCRD(false) if err != nil { - klog.Errorf("Failed to entirely update controller monitoring CRD: %v", err) + klog.ErrorS(err, "Failed to entirely update controller monitoring CRD") monitor.controllerCRD = nil } } @@ -143,40 +201,167 @@ func (monitor *controllerMonitor) deleteStaleAgentCRDs() { ResourceVersion: "0", }) if err != nil { - klog.Errorf("Failed to list agent monitoring CRDs: %v", err) + klog.ErrorS(err, "Failed to list agent monitoring CRDs") return } - // Delete stale agent monitoring CRD based on existing nodes. - nodeLister := monitor.nodeInformer.Lister() + existingNames := sets.NewString() for _, crd := range crds.Items { - _, err := nodeLister.Get(crd.Name) + existingNames.Insert(crd.Name) + } + // Delete stale agent monitoring CRD based on existing Nodes and ExternalNodes. + expectedNames := sets.NewString() + nodes, err := monitor.nodeLister.List(labels.Everything()) + if err != nil { + klog.ErrorS(err, "Failed to list nodes") + return + } + for _, node := range nodes { + expectedNames.Insert(node.Name) + } + if monitor.externalNodeEnabled { + externalNodes, err := monitor.externalNodeLister.List(labels.Everything()) + if err != nil { + klog.ErrorS(err, "Failed to list ExternalNode CRDs") + return + } + for _, en := range externalNodes { + expectedNames.Insert(en.Name) + } + } + staleSet := existingNames.Difference(expectedNames) + for _, name := range staleSet.List() { + monitor.deleteAgentCRD(name) + } +} + +func (monitor *controllerMonitor) enqueueNode(obj interface{}) { + node := obj.(*corev1.Node) + key, _ := keyFunc(node) + monitor.nodeQueue.Add(key) +} + +func (monitor *controllerMonitor) enqueueExternalNode(obj interface{}) { + en := obj.(*v1alpha1.ExternalNode) + key, _ := keyFunc(en) + monitor.externalNodeQueue.Add(key) +} + +func (n *controllerMonitor) nodeWorker() { + for n.processNextNodeWorkItem() { + } +} + +func (n *controllerMonitor) externalNodeWorker() { + for n.processNextExternalNodeWorkItem() { + } +} + +func (c *controllerMonitor) processNextNodeWorkItem() bool { + obj, quit := c.nodeQueue.Get() + if quit { + return false + } + defer c.nodeQueue.Done(obj) + + if key, ok := obj.(string); !ok { + c.nodeQueue.Forget(obj) + klog.Errorf("Expected string in Node work queue but got %#v", obj) + return true + } else if err := c.syncNode(key); err == nil { + // If no error occurs we Forget this item so it does not get queued again until + // another change happens. + c.nodeQueue.Forget(key) + } else { + // Put the item back on the workqueue to handle any transient errors. + c.nodeQueue.AddRateLimited(key) + klog.ErrorS(err, "Error syncing Node", "Node", key) + } + return true +} + +func (c *controllerMonitor) processNextExternalNodeWorkItem() bool { + obj, quit := c.externalNodeQueue.Get() + if quit { + return false + } + defer c.externalNodeQueue.Done(obj) + + if key, ok := obj.(string); !ok { + c.externalNodeQueue.Forget(obj) + klog.Errorf("Expected string in ExternalNode work queue but got %#v", obj) + return true + } else if err := c.syncExternalNode(key); err == nil { + // If no error occurs we Forget this item so it does not get queued again until + // another change happens. + c.externalNodeQueue.Forget(key) + } else { + // Put the item back on the workqueue to handle any transient errors. + c.externalNodeQueue.AddRateLimited(key) + klog.ErrorS(err, "Error syncing ExternalNode", "ExternalNode", key) + } + return true +} + +func (c *controllerMonitor) syncNode(key string) error { + _, name, err := splitKeyFunc(key) + if err != nil { + // This err should not occur. + return err + } + _, err = c.nodeLister.Get(name) + if err != nil { if errors.IsNotFound(err) { - monitor.deleteAgentCRD(crd.Name) + return c.deleteAgentCRD(name) + } else { + return err } } + return c.createAgentCRD(name) + } -func (monitor *controllerMonitor) deleteStaleAgentCRD(old interface{}) { - node, ok := old.(*corev1.Node) - if !ok { - tombstone, ok := old.(cache.DeletedFinalStateUnknown) - if !ok { - klog.Errorf("Error decoding object when deleting Node, invalid type: %v", old) - return +func (c *controllerMonitor) syncExternalNode(key string) error { + namespace, name, err := splitKeyFunc(key) + if err != nil { + // This err should not occur. + return err + } + _, err = c.externalNodeLister.ExternalNodes(namespace).Get(name) + if err != nil { + if errors.IsNotFound(err) { + return c.deleteAgentCRD(name) + } else { + return err } - node, ok = tombstone.Obj.(*corev1.Node) - if !ok { - klog.Errorf("Error decoding object tombstone when deleting Node, invalid type: %v", tombstone.Obj) - return + } + return c.createAgentCRD(name) + +} + +func (monitor *controllerMonitor) createAgentCRD(name string) error { + klog.InfoS("Creating agent monitoring CRD", "name", name) + agentCRD := new(v1beta1.AntreaAgentInfo) + agentCRD.Name = name + _, err := monitor.client.CrdV1beta1().AntreaAgentInfos().Create(context.TODO(), agentCRD, metav1.CreateOptions{}) + if err != nil { + if errors.IsAlreadyExists(err) { + klog.InfoS("Skipping creating agent monitoring CRD as it already exists", "name", name) + } else { + return err } } - monitor.deleteAgentCRD(node.Name) + return nil } -func (monitor *controllerMonitor) deleteAgentCRD(name string) { - klog.Infof("Deleting agent monitoring CRD %s", name) +func (monitor *controllerMonitor) deleteAgentCRD(name string) error { + klog.InfoS("Deleting agent monitoring CRD", "name", name) err := monitor.client.CrdV1beta1().AntreaAgentInfos().Delete(context.TODO(), name, metav1.DeleteOptions{}) if err != nil { - klog.Errorf("Failed to delete agent monitoring CRD %s: %v", name, err) + if errors.IsNotFound(err) { + klog.InfoS("Skipping deleting agent monitoring CRD as it is not found", "name", name) + } else { + return err + } } + return nil }