From 004ba822f33dfe6698810890596dba5e7962e545 Mon Sep 17 00:00:00 2001 From: Mengdie Song Date: Tue, 10 May 2022 17:37:10 +0800 Subject: [PATCH] [ExternalNode] Handle ExternalNode from Antrea agent side 1. Provide an example RBAC yaml file for Antrea agent running on VM with definitions of ClusterRole, ServiceAccount and ClusterRoleBinding. 2. Add ExternalNodeController to monitor ExternalNode CRUD, invoke interfaces to operate OVS and update interface store with ExternalEntitiyInterface. 3. Defined the interfaces for OVS operations. 4. Add a channel for receiving ExternalNode updates from ExternalNodeController and notifying NetworkPolicyController to reconcile rules related to the updated ExternalNodes. This is to handle the case when NetworkPolicyController reconciles rules before ExternalEntitityInterface is realized in the interface store. 5. Update NetworkPolicy reconciler to invoke GetInterfacesByEntity() and GetContainerInterfacesByPod() for ExternalEntity and Pod separately. Signed-off-by: Mengdie Song --- build/yamls/externalnode/vm-agent-rbac.yml | 101 +++++ cmd/antrea-agent/agent.go | 24 +- cmd/antrea-agent/options.go | 3 + pkg/agent/agent.go | 15 +- pkg/agent/controller/networkpolicy/cache.go | 38 +- .../controller/networkpolicy/cache_test.go | 2 +- .../networkpolicy/networkpolicy_controller.go | 3 +- .../networkpolicy_controller_test.go | 2 +- .../controller/networkpolicy/reconciler.go | 28 +- .../networkpolicy/status_controller_test.go | 2 +- .../externalnode/external_node_controller.go | 413 ++++++++++++++++++ pkg/agent/interfacestore/interface_cache.go | 30 +- pkg/agent/interfacestore/types.go | 10 + pkg/config/agent/config.go | 4 + pkg/util/externalnode/externalnode.go | 31 ++ 15 files changed, 677 insertions(+), 29 deletions(-) create mode 100644 build/yamls/externalnode/vm-agent-rbac.yml create mode 100644 pkg/agent/externalnode/external_node_controller.go create mode 100644 pkg/util/externalnode/externalnode.go diff --git a/build/yamls/externalnode/vm-agent-rbac.yml b/build/yamls/externalnode/vm-agent-rbac.yml new file mode 100644 index 00000000000..7a8e963b88e --- /dev/null +++ b/build/yamls/externalnode/vm-agent-rbac.yml @@ -0,0 +1,101 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: vm-agent + namespace: vm-demo # Change the namespace to where vm-agent is expected to run. +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vm-agent +rules: + # antrea-controller distributes the CA certificate as a ConfigMap named `antrea-ca` in the Antrea deployment Namespace. + # vm-agent needs to access `antrea-ca` to connect with antrea-controller. + - apiGroups: + - "" + resources: + - configmaps + resourceNames: + - antrea-ca + verbs: + - get + - watch + - list + # This is the content of built-in role kube-system/extension-apiserver-authentication-reader. + # But it doesn't have list/watch permission before K8s v1.17.0 so the extension apiserver (vm-agent) will + # have permission issue after bumping up apiserver library to a version that supports dynamic authentication. + # See https://github.com/kubernetes/kubernetes/pull/85375 + # To support K8s clusters older than v1.17.0, we grant the required permissions directly instead of relying on + # the extension-apiserver-authentication role. + - apiGroups: + - "" + resourceNames: + - extension-apiserver-authentication + resources: + - configmaps + verbs: + - get + - list + - watch + - apiGroups: + - crd.antrea.io + resourceNames: + - VM1 # Change the ExternalNode name which vm-agent is expected to update. + resources: + - antreaagentinfos + verbs: + - get + - update + - apiGroups: + - controlplane.antrea.io + resources: + - networkpolicies + - appliedtogroups + - addressgroups + verbs: + - get + - watch + - list + - apiGroups: + - controlplane.antrea.io + resources: + - nodestatssummaries + verbs: + - create + - apiGroups: + - controlplane.antrea.io + resources: + - networkpolicies/status + verbs: + - create + - get + - apiGroups: + - crd.antrea.io + resources: + - externalentities + verbs: + - get + - watch + - list + - apiGroups: + - crd.antrea.io + resources: + - externalnodes + verbs: + - get + - watch + - list +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vm-agent +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: vm-agent +subjects: + - kind: ServiceAccount + name: vm-agent + namespace: vm-demo # Change the namespace to where vm-agent is expected to run. \ No newline at end of file diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 32a4f6b9574..13874753167 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -38,6 +38,7 @@ import ( "antrea.io/antrea/pkg/agent/controller/noderoute" "antrea.io/antrea/pkg/agent/controller/serviceexternalip" "antrea.io/antrea/pkg/agent/controller/traceflow" + "antrea.io/antrea/pkg/agent/externalnode" "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/exporter" "antrea.io/antrea/pkg/agent/interfacestore" @@ -277,7 +278,16 @@ func run(o *Options) error { // podUpdateChannel is a channel for receiving Pod updates from CNIServer and // notifying NetworkPolicyController and EgressController to reconcile rules // related to the updated Pods. - podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) + var podUpdateChannel *channel.SubscribableChannel + // externalNodeUpdateChannel is a channel for receiving ExternalNode updates from ExternalNodeController and + // notifying NetworkPolicyController to reconcile rules related to the updated ExternalNodes. + var externalNodeUpdateChannel *channel.SubscribableChannel + if o.nodeType == config.K8sNode { + podUpdateChannel = channel.NewSubscribableChannel("PodUpdate", 100) + } else { + externalNodeUpdateChannel = channel.NewSubscribableChannel("ExternalNodeUpdate", 100) + } + // We set flow poll interval as the time interval for rule deletion in the async // rule cache, which is implemented as part of the idAllocator. This is to preserve // the rule info for populating NetworkPolicy fields in the Flow Exporter even @@ -296,6 +306,7 @@ func run(o *Options) error { ifaceStore, nodeConfig.Name, podUpdateChannel, + externalNodeUpdateChannel, groupCounters, groupIDUpdates, antreaPolicyEnabled, @@ -367,6 +378,7 @@ func run(o *Options) error { var cniServer *cniserver.CNIServer var cniPodInfoStore cnipodcache.CNIPodInfoStore + var externalNodeController *externalnode.ExternalNodeController if o.nodeType == config.K8sNode { isChaining := false if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { @@ -395,6 +407,13 @@ func run(o *Options) error { return fmt.Errorf("error initializing CNI server: %v", err) } } + } else { + externalNodeInformer := crdInformerFactory.Crd().V1alpha1().ExternalNodes() + externalNodeController, err = externalnode.NewExternalNodeController(ovsBridgeClient, ofClient, externalNodeInformer, + ifaceStore, externalNodeUpdateChannel, o.config.Namespace) + if err != nil { + return fmt.Errorf("error creating ExternalNode controller: %v", err) + } } var traceflowController *traceflow.Controller @@ -487,6 +506,9 @@ func run(o *Options) error { go podUpdateChannel.Run(stopCh) go cniServer.Run(stopCh) go nodeRouteController.Run(stopCh) + } else { + go externalNodeUpdateChannel.Run(stopCh) + go externalNodeController.Run(stopCh) } go antreaClientProvider.Run(stopCh) diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index f659d158c63..46c8b60e1a3 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -452,4 +452,7 @@ func (o *Options) setExternalNodeDefaultOptions() { o.config.EnablePrometheusMetrics = new(bool) *o.config.EnablePrometheusMetrics = false } + if o.config.Namespace == "" { + o.config.Namespace = "default" + } } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 335cd580327..0537a425413 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -37,6 +37,7 @@ import ( "antrea.io/antrea/pkg/agent/cniserver" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/controller/noderoute" + "antrea.io/antrea/pkg/agent/externalnode" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/openflow/cookie" @@ -275,9 +276,17 @@ func (i *Initializer) initInterfaceStore() error { case interfacestore.AntreaTunnel: intf = parseTunnelInterfaceFunc(port, ovsPort) case interfacestore.AntreaHost: - // Not load the host interface, because it is configured on the OVS bridge port, and we don't need a - // specific interface in the interfaceStore. - intf = nil + if port.Name == i.ovsBridge { + // Not load the host interface, because it is configured on the OVS bridge port, and we don't need a + // specific interface in the interfaceStore. + intf = nil + } else { + var err error + intf, err = externalnode.ParseHostInterfaceConfig(i.ovsBridgeClient, port, ovsPort) + if err != nil { + return err + } + } case interfacestore.AntreaContainer: // The port should be for a container interface. intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort, true) diff --git a/pkg/agent/controller/networkpolicy/cache.go b/pkg/agent/controller/networkpolicy/cache.go index e0fc264f260..145189629ff 100644 --- a/pkg/agent/controller/networkpolicy/cache.go +++ b/pkg/agent/controller/networkpolicy/cache.go @@ -33,6 +33,7 @@ import ( crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/externalnode" "antrea.io/antrea/pkg/util/k8s" ) @@ -336,7 +337,8 @@ func toServicesIndexFunc(obj interface{}) ([]string, error) { } // newRuleCache returns a new *ruleCache. -func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Subscriber, serviceGroupIDUpdate <-chan string) *ruleCache { +func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Subscriber, externalNodeUpdateSubscriber channel.Subscriber, + serviceGroupIDUpdate <-chan string) *ruleCache { rules := cache.NewIndexer( ruleKeyFunc, cache.Indexers{addressGroupIndex: addressGroupIndexFunc, appliedToGroupIndex: appliedToGroupIndexFunc, policyIndex: policyIndexFunc, toServicesIndex: toServicesIndexFunc}, @@ -350,7 +352,14 @@ func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Sub groupIDUpdates: serviceGroupIDUpdate, } // Subscribe Pod update events from CNIServer. - podUpdateSubscriber.Subscribe(cache.processPodUpdate) + if podUpdateSubscriber != nil { + podUpdateSubscriber.Subscribe(cache.processPodUpdate) + } + // Subscribe ExternalNode update events from ExternalNodeController + if externalNodeUpdateSubscriber != nil { + externalNodeUpdateSubscriber.Subscribe(cache.processExternalNodeUpdate) + } + go cache.processGroupIDUpdates() return cache } @@ -379,6 +388,31 @@ func (c *ruleCache) processPodUpdate(e interface{}) { } } +// processExternalNodeUpdate will be called when ExternalNodeController publishes an ExternalNode update event. +// It finds out AppliedToGroups that contains this ExternalNode converted ExternalEntity and trigger reconciling +// of related rules. +// It can enforce NetworkPolicies to ExternalEntities after ExternalEntityInterface is realised in the interface store. +func (c *ruleCache) processExternalNodeUpdate(e interface{}) { + externalNode := e.(*crdv1alpha1.ExternalNode) + eeName := externalnode.GenExternalEntityName(externalNode) + c.appliedToSetLock.RLock() + defer c.appliedToSetLock.RUnlock() + externalEntityEquals := func(ee *v1beta.ExternalEntityReference, namespace string, name string) bool { + if ee.Namespace == namespace && ee.Name == name { + return true + } + return false + } + for group, memberSet := range c.appliedToSetByGroup { + for _, member := range memberSet.Items() { + if externalEntityEquals(member.ExternalEntity, externalNode.Namespace, eeName) { + c.onAppliedToGroupUpdate(group) + break + } + } + } +} + // processGroupIDUpdates is an infinite loop that takes Service groupID // update events from the channel, finds out rules that refer this Service in // ToServices field and use dirtyRuleHandler to re-queue these rules. diff --git a/pkg/agent/controller/networkpolicy/cache_test.go b/pkg/agent/controller/networkpolicy/cache_test.go index 00c01d3ed25..2c99b9cb4f9 100644 --- a/pkg/agent/controller/networkpolicy/cache_test.go +++ b/pkg/agent/controller/networkpolicy/cache_test.go @@ -278,7 +278,7 @@ func newFakeRuleCache() (*ruleCache, *dirtyRuleRecorder, *channel.SubscribableCh recorder := newDirtyRuleRecorder() podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) ch2 := make(chan string, 100) - c := newRuleCache(recorder.Record, podUpdateChannel, ch2) + c := newRuleCache(recorder.Record, podUpdateChannel, nil, ch2) return c, recorder, podUpdateChannel } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index c0c06ccfd70..f068b2b04c9 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -114,6 +114,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, ifaceStore interfacestore.InterfaceStore, nodeName string, podUpdateSubscriber channel.Subscriber, + externalNodeUpdateSubscriber channel.Subscriber, groupCounters []proxytypes.GroupCounter, groupIDUpdates <-chan string, antreaPolicyEnabled bool, @@ -144,7 +145,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, } } c.reconciler = newReconciler(ofClient, ifaceStore, idAllocator, c.fqdnController, groupCounters, v4Enabled, v6Enabled, antreaPolicyEnabled) - c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, groupIDUpdates) + c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, externalNodeUpdateSubscriber, groupIDUpdates) if statusManagerEnabled { c.statusManager = newStatusController(antreaClientGetter, nodeName, c.ruleCache) } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index b7f54207c57..7a5c6eda84c 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -58,7 +58,7 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { ch2 := make(chan string, 100) groupIDAllocator := openflow.NewGroupAllocator(false) groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)} - controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, groupCounters, ch2, + controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, testAsyncDeleteInterval, "8.8.8.8:53", true, false) reconciler := newMockReconciler() controller.reconciler = reconciler diff --git a/pkg/agent/controller/networkpolicy/reconciler.go b/pkg/agent/controller/networkpolicy/reconciler.go index 16cbf78484b..77849cf60d4 100644 --- a/pkg/agent/controller/networkpolicy/reconciler.go +++ b/pkg/agent/controller/networkpolicy/reconciler.go @@ -873,20 +873,22 @@ func (r *reconciler) GetRuleByFlowID(ruleFlowID uint32) (*types.PolicyRule, bool func (r *reconciler) getOFPorts(members v1beta2.GroupMemberSet) sets.Int32 { ofPorts := sets.NewInt32() for _, m := range members { - var entityName, ns string + var ifaces []*interfacestore.InterfaceConfig + var name, ns string if m.Pod != nil { - entityName, ns = m.Pod.Name, m.Pod.Namespace + name, ns = m.Pod.Name, m.Pod.Namespace + ifaces = r.ifaceStore.GetContainerInterfacesByPod(name, ns) } else if m.ExternalEntity != nil { - entityName, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace + name, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace + ifaces = r.ifaceStore.GetInterfacesByEntity(name, ns) } - ifaces := r.ifaceStore.GetInterfacesByEntity(entityName, ns) if len(ifaces) == 0 { // This might be because the container has been deleted during realization or hasn't been set up yet. - klog.Infof("Can't find interface for %s/%s, skipping", ns, entityName) + klog.Infof("Can't find interface for %s/%s, skipping", ns, name) continue } for _, iface := range ifaces { - klog.V(2).Infof("Got OFPort %v for %s/%s", iface.OFPort, ns, entityName) + klog.V(2).Infof("Got OFPort %v for %s/%s", iface.OFPort, ns, name) ofPorts.Insert(iface.OFPort) } } @@ -896,22 +898,24 @@ func (r *reconciler) getOFPorts(members v1beta2.GroupMemberSet) sets.Int32 { func (r *reconciler) getIPs(members v1beta2.GroupMemberSet) sets.String { ips := sets.NewString() for _, m := range members { - var entityName, ns string + var ifaces []*interfacestore.InterfaceConfig + var name, ns string if m.Pod != nil { - entityName, ns = m.Pod.Name, m.Pod.Namespace + name, ns = m.Pod.Name, m.Pod.Namespace + ifaces = r.ifaceStore.GetContainerInterfacesByPod(name, ns) } else if m.ExternalEntity != nil { - entityName, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace + name, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace + ifaces = r.ifaceStore.GetInterfacesByEntity(name, ns) } - ifaces := r.ifaceStore.GetInterfacesByEntity(entityName, ns) if len(ifaces) == 0 { // This might be because the container has been deleted during realization or hasn't been set up yet. - klog.Infof("Can't find interface for %s/%s, skipping", ns, entityName) + klog.Infof("Can't find interface for %s/%s, skipping", ns, name) continue } for _, iface := range ifaces { for _, ipAddr := range iface.IPs { if ipAddr != nil { - klog.V(2).Infof("Got IP %v for %s/%s", iface.IPs, ns, entityName) + klog.V(2).Infof("Got IP %v for %s/%s", iface.IPs, ns, name) ips.Insert(ipAddr.String()) } } diff --git a/pkg/agent/controller/networkpolicy/status_controller_test.go b/pkg/agent/controller/networkpolicy/status_controller_test.go index d719e8baa10..b92af101e8c 100644 --- a/pkg/agent/controller/networkpolicy/status_controller_test.go +++ b/pkg/agent/controller/networkpolicy/status_controller_test.go @@ -51,7 +51,7 @@ func (c *fakeNetworkPolicyControl) getNetworkPolicyStatus() *v1beta2.NetworkPoli } func newTestStatusController() (*StatusController, *ruleCache, *fakeNetworkPolicyControl) { - ruleCache := newRuleCache(func(s string) {}, channel.NewSubscribableChannel("PodUpdate", 100), make(chan string, 100)) + ruleCache := newRuleCache(func(s string) {}, channel.NewSubscribableChannel("PodUpdate", 100), nil, make(chan string, 100)) statusControl := &fakeNetworkPolicyControl{} statusController := newStatusController(nil, testNode1, ruleCache) statusController.statusControlInterface = statusControl diff --git a/pkg/agent/externalnode/external_node_controller.go b/pkg/agent/externalnode/external_node_controller.go new file mode 100644 index 00000000000..4b849d52aa1 --- /dev/null +++ b/pkg/agent/externalnode/external_node_controller.go @@ -0,0 +1,413 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externalnode + +import ( + "fmt" + "reflect" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" + eninformer "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" + enlister "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" + "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/env" + "antrea.io/antrea/pkg/util/externalnode" + "antrea.io/antrea/pkg/util/k8s" +) + +const ( + controllerName = "ExternalNodeController" + // How long to wait before retrying the processing of an ExternalNode change. + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second + // Default number of workers processing ExternalNode changes. + defaultWorkers = 4 + // Disable resyncing. + resyncPeriod time.Duration = 0 +) + +var ( + keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc + splitKeyFunc = cache.SplitMetaNamespaceKey +) + +type ExternalNodeController struct { + ovsBridgeClient ovsconfig.OVSBridgeClient + ofClient openflow.Client + externalNodeInformer cache.SharedIndexInformer + externalNodeLister enlister.ExternalNodeNamespaceLister + externalNodeListerSynced cache.InformerSynced + queue workqueue.RateLimitingInterface + ifaceStore interfacestore.InterfaceStore + syncedExternalNodes cache.Store + // nodeUpdateNotifier is used for notifying updates of local ExternalNode to NetworkPolicyController. + nodeUpdateNotifier channel.Notifier + nodeName string + namespace string +} + +func NewExternalNodeController(ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, externalNodeInformer eninformer.ExternalNodeInformer, + ifaceStore interfacestore.InterfaceStore, nodeUpdateNotifier channel.Notifier, namespace string) (*ExternalNodeController, error) { + c := &ExternalNodeController{ + ovsBridgeClient: ovsBridgeClient, + ofClient: ofClient, + externalNodeInformer: externalNodeInformer.Informer(), + externalNodeLister: externalNodeInformer.Lister().ExternalNodes(namespace), + externalNodeListerSynced: externalNodeInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "externalNode"), + ifaceStore: ifaceStore, + syncedExternalNodes: cache.NewStore(keyFunc), + nodeUpdateNotifier: nodeUpdateNotifier, + } + nodeName, err := env.GetNodeName() + if err != nil { + return nil, err + } + c.nodeName = nodeName + c.externalNodeInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueueExternalNodeAdd, + UpdateFunc: c.enqueueExternalNodeUpdate, + DeleteFunc: c.enqueueExternalNodeDelete, + }, + resyncPeriod) + + return c, nil +} + +func (c *ExternalNodeController) enqueueExternalNodeAdd(obj interface{}) { + en := obj.(*v1alpha1.ExternalNode) + if en.Name != c.nodeName || en.Namespace != c.namespace { + return + } + key, _ := keyFunc(en) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode ADD event", "ExternalNode", klog.KObj(en)) +} + +func (c *ExternalNodeController) enqueueExternalNodeUpdate(oldObj interface{}, newObj interface{}) { + oldEn := oldObj.(*v1alpha1.ExternalNode) + newEn := newObj.(*v1alpha1.ExternalNode) + if newEn.Name != c.nodeName || newEn.Namespace != c.namespace { + return + } + if reflect.DeepEqual(oldEn.Spec.Interfaces, newEn.Spec.Interfaces) { + klog.InfoS("Skip enqueuing ExternalNode UPDATE event as no changes for interfaces", "ExternalNode", klog.KObj(newEn)) + return + } + key, _ := keyFunc(newEn) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode UPDATE event", "ExternalNode", klog.KObj(newEn)) +} + +func (c *ExternalNodeController) enqueueExternalNodeDelete(obj interface{}) { + en := obj.(*v1alpha1.ExternalNode) + if en.Name != c.nodeName || en.Namespace != c.namespace { + return + } + key, _ := keyFunc(en) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode DELETE event", "ExternalNode", klog.KObj(en)) +} + +// Run will create defaultWorkers workers (goroutines) which will process the ExternalNode events from the work queue. +func (c *ExternalNodeController) Run(stopCh <-chan struct{}) { + defer c.queue.ShutDown() + + klog.Infof("Starting %s", controllerName) + defer klog.Infof("Shutting down %s", controllerName) + + if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.externalNodeListerSynced) { + klog.Error("Failed to wait for syncing cache for ExternalNodes") + return + } + + if err := c.reconcile(); err != nil { + klog.Errorf("Failed to reconcile %v", err) + return + } + + for i := 0; i < defaultWorkers; i++ { + go wait.Until(c.worker, time.Second, stopCh) + } + <-stopCh +} + +func (c *ExternalNodeController) reconcile() error { + klog.Info("ExternalNodeController starts reconciliation") + if err := c.reconcileHostUplinkFlows(); err != nil { + return fmt.Errorf("failed to reconcile host uplink flows %v", err) + } + if err := c.reconcileExternalEntityInterfaces(); err != nil { + return fmt.Errorf("failed to reconcile ExternalEntity interfaces %v", err) + } + klog.Info("ExternalNodeController finishes reconciliation") + return nil +} + +// TODO: Install host uplink flows +func (c *ExternalNodeController) reconcileHostUplinkFlows() error { + hostIfaces := c.ifaceStore.GetInterfacesByType(interfacestore.ExternalEntityInterface) + for _, hostIface := range hostIfaces { + // TODO + klog.InfoS("Reconcile host uplink flow for ExternalEntityInterface", "ifName", hostIface.InterfaceName) + } + return nil +} + +func (c *ExternalNodeController) reconcileExternalEntityInterfaces() error { + en, err := c.externalNodeLister.Get(c.nodeName) + if err != nil { + return err + } + if err = c.addExternalNode(k8s.NamespacedName(en.Namespace, en.Name), en); err != nil { + return err + } + eeName := externalnode.GenExternalEntityName(en) + hostIfaces := c.ifaceStore.GetInterfacesByType(interfacestore.ExternalEntityInterface) + for _, hostIface := range hostIfaces { + if hostIface.EntityName != eeName || hostIface.EntityNamespace != en.Namespace { + if err = c.deleteInterface(hostIface.InterfaceName); err != nil { + return err + } + } + } + return nil +} + +// worker is a long-running function that will continually call the processNextWorkItem function in +// order to read and process a message on the work queue. +func (c *ExternalNodeController) worker() { + for c.processNextWorkItem() { + } +} + +func (c *ExternalNodeController) processNextWorkItem() bool { + obj, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(obj) + + if key, ok := obj.(string); !ok { + c.queue.Forget(obj) + klog.Errorf("Expected string in work queue but got %#v", obj) + return true + } else if err := c.syncExternalNode(key); err == nil { + // If no error occurs we Forget this item so it does not get queued again until + // another change happens. + c.queue.Forget(key) + } else { + // Put the item back on the workqueue to handle any transient errors. + c.queue.AddRateLimited(key) + klog.Errorf("Error syncing ExternalNode %s, requeuing. Error: %v", key, err) + } + return true +} + +func (c *ExternalNodeController) syncExternalNode(key string) error { + _, name, err := splitKeyFunc(key) + if err != nil { + // This err should not occur. + return err + } + en, err := c.externalNodeLister.Get(name) + if errors.IsNotFound(err) { + return c.deleteExternalNode(key) + } + preEn, exists, _ := c.syncedExternalNodes.GetByKey(key) + if !exists { + return c.addExternalNode(key, en) + } else { + return c.updateExternalNode(key, preEn, en) + } +} + +func (c *ExternalNodeController) addExternalNode(key string, en *v1alpha1.ExternalNode) error { + klog.InfoS("Adding ExternalNode", "ExternalNode", klog.KObj(en)) + if len(en.Spec.Interfaces) == 0 { + klog.InfoS("Skipping handling ExternalNode add as there are no interfaces defined", "ExternalNode", klog.KObj(en)) + return nil + } + eeName := externalnode.GenExternalEntityName(en) + ifName, err := getHostInterfaceName(en.Spec.Interfaces[0]) + if err != nil { + return err + } + if err := c.addInterface(ifName, en.Namespace, eeName, en.Spec.Interfaces[0].IPs); err != nil { + return err + } + c.syncedExternalNodes.Add(en) + // Notify the ExternalNode create event to NetworkPolicyController. + c.nodeUpdateNotifier.Notify(en) + return nil +} + +func (c *ExternalNodeController) addInterface(ifName string, eeNamespace string, eeName string, ips []string) error { + hostIface, portExists := c.ifaceStore.GetInterfaceByName(ifName) + if !portExists { + klog.InfoS("Creating OVS ports and flows for ExternalEntityInterface", "ifName", ifName, "eeName", eeName, "ip", ips) + uplinkName := genUplinkInterfaceName(ifName) + iface, err := c.createOVSPortsAndFlows(uplinkName, ifName, eeNamespace, eeName, ips) + if err != nil { + return err + } + c.ifaceStore.AddInterface(iface) + return nil + } + klog.InfoS("Updating OVS port data", "ifName", ifName, "eeName", eeName, "ip", ips) + iface, err := c.updateOVSPortsAndFlows(hostIface, eeNamespace, eeName, ips) + if err != nil { + return err + } + c.ifaceStore.AddInterface(iface) + return nil +} + +func (c *ExternalNodeController) updateExternalNode(key string, obj interface{}, curEn *v1alpha1.ExternalNode) error { + klog.InfoS("Updating ExternalNode", "ExternalNode", klog.KObj(curEn)) + preEn := obj.(*v1alpha1.ExternalNode) + // ExternalNode whose interfaces which are always empty will not be enqueued. + // The update case can be divided to the following 3 cases. + if len(preEn.Spec.Interfaces) == 0 && len(curEn.Spec.Interfaces) > 0 { + klog.InfoS("Updating ExternalNode with added interfaces", "newInterfaces", curEn.Spec.Interfaces) + eeName := externalnode.GenExternalEntityName(curEn) + ifName, err := getHostInterfaceName(curEn.Spec.Interfaces[0]) + if err != nil { + return err + } + if err = c.addInterface(ifName, curEn.Namespace, eeName, curEn.Spec.Interfaces[0].IPs); err != nil { + return err + } + } else if len(preEn.Spec.Interfaces) > 0 && len(curEn.Spec.Interfaces) == 0 { + klog.InfoS("Updating ExternalNode with removed interfaces", "oldInterfaces", preEn.Spec.Interfaces) + ifName, err := getHostInterfaceName(preEn.Spec.Interfaces[0]) + if err != nil { + return err + } + if err = c.deleteInterface(ifName); err != nil { + return err + } + } else { + klog.InfoS("Updating ExternalNode with changed interfaces", "oldInterfaces", preEn.Spec.Interfaces, "curInterfaces", curEn.Spec.Interfaces) + preEEName := externalnode.GenExternalEntityName(preEn) + preIfName, err := getHostInterfaceName(preEn.Spec.Interfaces[0]) + if err != nil { + return err + } + curEEName := externalnode.GenExternalEntityName(curEn) + curIfName, err := getHostInterfaceName(curEn.Spec.Interfaces[0]) + if err != nil { + return err + } + if preIfName != curIfName { + klog.InfoS("Found interface name is changed", "preName", preIfName, "curName", curIfName) + if err = c.deleteInterface(preIfName); err != nil { + return err + } + if err = c.addInterface(curIfName, curEn.Namespace, curEEName, curEn.Spec.Interfaces[0].IPs); err != nil { + return err + } + } else if (!reflect.DeepEqual(preEn.Spec.Interfaces[0].IPs, curEn.Spec.Interfaces[0].IPs)) || (preEEName != curEEName) { + klog.InfoS("Found interface configuration is changed", "preIPs", preEn.Spec.Interfaces[0].IPs, "preEEName", preEEName, + "curIPs", curEn.Spec.Interfaces[0].IPs, "curEEName", curEEName) + if err = c.addInterface(curIfName, curEn.Namespace, curEEName, curEn.Spec.Interfaces[0].IPs); err != nil { + return err + } + } + } + + c.syncedExternalNodes.Add(curEn) + // Notify the ExternalNode update event to NetworkPolicyController. + c.nodeUpdateNotifier.Notify(curEn) + return nil +} + +func (c *ExternalNodeController) deleteExternalNode(key string) error { + klog.InfoS("Deleting ExternalNode", "key", key) + obj, exists, _ := c.syncedExternalNodes.GetByKey(key) + if !exists { + klog.InfoS("Skipping ExternalNode deletion as it hasn't been synced", "ExternalEntityKey", key) + return nil + } + en := obj.(*v1alpha1.ExternalNode) + if len(en.Spec.Interfaces) == 0 { + klog.InfoS("Skipping handling ExternalNode deletion as there are no interfaces defined", "ExternalNode", klog.KObj(en)) + return nil + } + ifName, err := getHostInterfaceName(en.Spec.Interfaces[0]) + if err != nil { + return err + } + if err := c.deleteInterface(ifName); err != nil { + return err + } + c.syncedExternalNodes.Delete(en) + return nil +} + +func (c *ExternalNodeController) deleteInterface(ifName string) error { + hostIface, portExists := c.ifaceStore.GetInterfaceByName(ifName) + if !portExists { + klog.InfoS("Skipping deleting host interface since it doesn't exist ", "ifName", ifName) + return nil + } + klog.InfoS("Deleting interface", "ifName", ifName) + if err := c.removeOVSPortsAndFlows(hostIface); err != nil { + return err + } + c.ifaceStore.DeleteInterface(hostIface) + return nil +} + +// TODO: Implement createOVSPortsAndFlows +func (c *ExternalNodeController) createOVSPortsAndFlows(uplinkName, hostIfName, eeNamespace string, eeName string, ips []string) (*interfacestore.InterfaceConfig, error) { + return &interfacestore.InterfaceConfig{}, nil +} + +// TODO: Implement updateOVSPortsAndFlows +func (c *ExternalNodeController) updateOVSPortsAndFlows(interfaceConfig *interfacestore.InterfaceConfig, eeNamespace string, eeName string, ips []string) (*interfacestore.InterfaceConfig, error) { + return &interfacestore.InterfaceConfig{}, nil +} + +// TODO: Implement removeOVSPortsAndFlows +func (a *ExternalNodeController) removeOVSPortsAndFlows(interfaceConfig *interfacestore.InterfaceConfig) error { + return nil +} + +// TODO: Identify interface by interface IP +func getHostInterfaceName(iface v1alpha1.NetworkInterface) (string, error) { + + return "", nil +} + +// TODO: Implement ParseHostInterfaceConfig which will be used by initInterfaceStore +func ParseHostInterfaceConfig(ovsBridgeClient ovsconfig.OVSBridgeClient, portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) (*interfacestore.InterfaceConfig, error) { + return nil, nil +} + +func genUplinkInterfaceName(hostIfName string) string { + return fmt.Sprintf("phy-%s", hostIfName) +} diff --git a/pkg/agent/interfacestore/interface_cache.go b/pkg/agent/interfacestore/interface_cache.go index 0045d29f4b7..221a42656d7 100644 --- a/pkg/agent/interfacestore/interface_cache.go +++ b/pkg/agent/interfacestore/interface_cache.go @@ -42,6 +42,8 @@ const ( interfaceIPIndex = "ip" // ofPortIndex is the index built with InterfaceConfig.OFPort ofPortIndex = "ofPort" + + externalEntityIndex = "externalEntity" ) // Local cache for interfaces created on node, including container, host gateway, and tunnel @@ -172,7 +174,12 @@ func (c *interfaceCache) GetContainerInterface(containerID string) (*InterfaceCo } func (c *interfaceCache) GetInterfacesByEntity(name, namespace string) []*InterfaceConfig { - return c.GetContainerInterfacesByPod(name, namespace) + objs, _ := c.cache.ByIndex(externalEntityIndex, k8s.NamespacedName(namespace, name)) + interfaces := make([]*InterfaceConfig, len(objs)) + for i := range objs { + interfaces[i] = objs[i].(*InterfaceConfig) + } + return interfaces } // GetContainerInterfacesByPod retrieves InterfaceConfigs for the Pod. @@ -256,15 +263,24 @@ func interfaceOFPortIndexFunc(obj interface{}) ([]string, error) { return []string{fmt.Sprintf("%d", interfaceConfig.OFPort)}, nil } +func externalEntityIndexFunc(obj interface{}) ([]string, error) { + interfaceConfig := obj.(*InterfaceConfig) + if interfaceConfig.Type != ExternalEntityInterface { + return []string{}, nil + } + return []string{k8s.NamespacedName(interfaceConfig.EntityNamespace, interfaceConfig.EntityName)}, nil +} + func NewInterfaceStore() InterfaceStore { return &interfaceCache{ cache: cache.NewIndexer(getInterfaceKey, cache.Indexers{ - interfaceNameIndex: interfaceNameIndexFunc, - interfaceTypeIndex: interfaceTypeIndexFunc, - containerIDIndex: containerIDIndexFunc, - podIndex: podIndexFunc, - interfaceIPIndex: interfaceIPIndexFunc, - ofPortIndex: interfaceOFPortIndexFunc, + interfaceNameIndex: interfaceNameIndexFunc, + interfaceTypeIndex: interfaceTypeIndexFunc, + containerIDIndex: containerIDIndexFunc, + podIndex: podIndexFunc, + interfaceIPIndex: interfaceIPIndexFunc, + ofPortIndex: interfaceOFPortIndexFunc, + externalEntityIndex: externalEntityIndexFunc, }), } } diff --git a/pkg/agent/interfacestore/types.go b/pkg/agent/interfacestore/types.go index a443dd6dd6d..ce3718b1d53 100644 --- a/pkg/agent/interfacestore/types.go +++ b/pkg/agent/interfacestore/types.go @@ -33,6 +33,8 @@ const ( UplinkInterface // HostInterface is used to mark current interface is for host HostInterface + // ExternalEntityInterface is used to mark current interface is for ExternalEntity Endpoint + ExternalEntityInterface AntreaInterfaceTypeKey = "antrea-type" AntreaGateway = "gateway" @@ -74,6 +76,13 @@ type TunnelInterfaceConfig struct { Csum bool } +type EntityInterfaceConfig struct { + EntityName string + EntityNamespace string + UplinkPort *OVSPortConfig + HostIfaceIndex int +} + type InterfaceConfig struct { Type InterfaceType // Unique name of the interface, also used for the OVS port name. @@ -85,6 +94,7 @@ type InterfaceConfig struct { *OVSPortConfig *ContainerInterfaceConfig *TunnelInterfaceConfig + *EntityInterfaceConfig } // InterfaceStore is a service interface to create local interfaces for container, host gateway, and tunnel port. diff --git a/pkg/config/agent/config.go b/pkg/config/agent/config.go index 34be6b2c635..745c212255c 100644 --- a/pkg/config/agent/config.go +++ b/pkg/config/agent/config.go @@ -192,6 +192,10 @@ type AgentConfig struct { // NodeType is type of the Node where Antrea Agent is running. // Defaults to "k8sNode". Valid values include "k8sNode", and "externalNode". NodeType string `yaml:"nodeType,omitempty"` + // The namespace to create the ExternalNode for the VM/BM object. + // The default value is "default". + // It is used only when NodeType is externalNode. + Namespace string `yaml:"namespace,omitempty"` } type AntreaProxyConfig struct { diff --git a/pkg/util/externalnode/externalnode.go b/pkg/util/externalnode/externalnode.go new file mode 100644 index 00000000000..b1b8077a57f --- /dev/null +++ b/pkg/util/externalnode/externalnode.go @@ -0,0 +1,31 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externalnode + +import "antrea.io/antrea/pkg/apis/crd/v1alpha1" + +func GenExternalEntityName(externalNode *v1alpha1.ExternalNode) string { + if len(externalNode.Spec.Interfaces) == 0 { + return "" + } + // Only one network interface is supported now. + // Other interfaces except interfaces[0] will be ignored if there are more than one interfaces. + ifName := externalNode.Spec.Interfaces[0].Name + if ifName == "" { + return externalNode.Name + } else { + return externalNode.Name + "-" + ifName + } +}