diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index 1814cdeb34a..0527661c0eb 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -53,9 +53,6 @@ featureGates: # Enable certificated-based authentication for IPsec. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "IPsecCertAuth" "default" false) }} -# Enable running agent on an unmanaged VM/BM. -{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "ExternalNode" "default" false) }} - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: {{ .Values.ovs.bridgeName | quote }} diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index edcebc90eaa..b11e67b05f5 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -111,9 +111,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3648,7 +3645,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c008cffcecc9959862be5461e6967f4c49c0bcc06dd9926cacc0ef2a37e5111c + checksum/config: add862bdadbc70ea7e2e2c44fc60f52d9ee753d5d1bad4c6ed6fc158cfd29d12 labels: app: antrea component: antrea-agent @@ -3888,7 +3885,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c008cffcecc9959862be5461e6967f4c49c0bcc06dd9926cacc0ef2a37e5111c + checksum/config: add862bdadbc70ea7e2e2c44fc60f52d9ee753d5d1bad4c6ed6fc158cfd29d12 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 296f6a05dae..141063bdb35 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -111,9 +111,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3648,7 +3645,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c008cffcecc9959862be5461e6967f4c49c0bcc06dd9926cacc0ef2a37e5111c + checksum/config: add862bdadbc70ea7e2e2c44fc60f52d9ee753d5d1bad4c6ed6fc158cfd29d12 labels: app: antrea component: antrea-agent @@ -3890,7 +3887,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c008cffcecc9959862be5461e6967f4c49c0bcc06dd9926cacc0ef2a37e5111c + checksum/config: add862bdadbc70ea7e2e2c44fc60f52d9ee753d5d1bad4c6ed6fc158cfd29d12 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index ac0381c09d8..a60c560afb2 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -111,9 +111,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3648,7 +3645,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 7cbfbb91897e3ebdbb7914a2e6831cef47ef08e6104c00aca6984bf32bd0e022 + checksum/config: d0d5d7f0e4e920f762944fdd78e439ec68af61926a68f7f7f49f8f1ab0ac7fa6 labels: app: antrea component: antrea-agent @@ -3888,7 +3885,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 7cbfbb91897e3ebdbb7914a2e6831cef47ef08e6104c00aca6984bf32bd0e022 + checksum/config: d0d5d7f0e4e920f762944fdd78e439ec68af61926a68f7f7f49f8f1ab0ac7fa6 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 9b65fbc232e..40eb9c46fce 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -124,9 +124,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3661,7 +3658,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 32f6800db6e4eaf5f97ee1d2135b07b44a96fc83bbd8f846b7f8fb616414edc6 + checksum/config: 50764285ac4a299fe772f50f3e5f8b1ff320285f61e0e907353805fe881a06d7 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -3947,7 +3944,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 32f6800db6e4eaf5f97ee1d2135b07b44a96fc83bbd8f846b7f8fb616414edc6 + checksum/config: 50764285ac4a299fe772f50f3e5f8b1ff320285f61e0e907353805fe881a06d7 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index d3b601f57de..3dbe4e0aea6 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -111,9 +111,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3648,7 +3645,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: d729415bf3a9dafaff0d537bbc7104b4a8e672959f7804e342ef25d586747e83 + checksum/config: 18b98dbb7837e8da7cebaf3c0cb35758708fd2372bc02f73aca86e01539f93a2 labels: app: antrea component: antrea-agent @@ -3888,7 +3885,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: d729415bf3a9dafaff0d537bbc7104b4a8e672959f7804e342ef25d586747e83 + checksum/config: 18b98dbb7837e8da7cebaf3c0cb35758708fd2372bc02f73aca86e01539f93a2 labels: app: antrea component: antrea-controller diff --git a/build/yamls/externalnode/conf/antrea-agent.conf b/build/yamls/externalnode/conf/antrea-agent.conf index 55e8015bfa2..19b66ed85e4 100644 --- a/build/yamls/externalnode/conf/antrea-agent.conf +++ b/build/yamls/externalnode/conf/antrea-agent.conf @@ -32,6 +32,10 @@ featureGates: # Defaults to "k8sNode". Valid values include "k8sNode", and "externalNode". nodeType: externalNode +# Namespace is the expected namespace to create the ExternalNode for the VM/BM object. +# Defaults to "default". +#namespace: default + # The path to access the kubeconfig file used in the connection to K8s APIServer. The file contains the K8s # APIServer endpoint and the token of ServiceAccount required in the connection. clientConnection: diff --git a/build/yamls/externalnode/vm-agent-rbac.yml b/build/yamls/externalnode/vm-agent-rbac.yml new file mode 100644 index 00000000000..49d0c7ab856 --- /dev/null +++ b/build/yamls/externalnode/vm-agent-rbac.yml @@ -0,0 +1,101 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: vm-agent + namespace: test-ns # Change the namespace to where vm-agent is expected to run. +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vm-agent +rules: + # antrea-controller distributes the CA certificate as a ConfigMap named `antrea-ca` in the Antrea deployment Namespace. + # vm-agent needs to access `antrea-ca` to connect with antrea-controller. + - apiGroups: + - "" + resources: + - configmaps + resourceNames: + - antrea-ca + verbs: + - get + - watch + - list + # This is the content of built-in role kube-system/extension-apiserver-authentication-reader. + # But it doesn't have list/watch permission before K8s v1.17.0 so the extension apiserver (vm-agent) will + # have permission issue after bumping up apiserver library to a version that supports dynamic authentication. + # See https://github.com/kubernetes/kubernetes/pull/85375 + # To support K8s clusters older than v1.17.0, we grant the required permissions directly instead of relying on + # the extension-apiserver-authentication role. + - apiGroups: + - "" + resourceNames: + - extension-apiserver-authentication + resources: + - configmaps + verbs: + - get + - list + - watch + - apiGroups: + - crd.antrea.io + resourceNames: + - VM1 # Change the ExternalNode name which vm-agent is expected to update. + resources: + - antreaagentinfos + verbs: + - get + - update + - apiGroups: + - controlplane.antrea.io + resources: + - networkpolicies + - appliedtogroups + - addressgroups + verbs: + - get + - watch + - list + - apiGroups: + - controlplane.antrea.io + resources: + - nodestatssummaries + verbs: + - create + - apiGroups: + - controlplane.antrea.io + resources: + - networkpolicies/status + verbs: + - create + - get + - apiGroups: + - crd.antrea.io + resources: + - externalentities + verbs: + - get + - watch + - list + - apiGroups: + - crd.antrea.io + resources: + - externalnodes + verbs: + - get + - watch + - list +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vm-agent +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: vm-agent +subjects: + - kind: ServiceAccount + name: vm-agent + namespace: test-ns # Change the namespace to where vm-agent is expected to run. \ No newline at end of file diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 29dedcfac2d..27312f83cba 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -40,6 +40,7 @@ import ( "antrea.io/antrea/pkg/agent/controller/noderoute" "antrea.io/antrea/pkg/agent/controller/serviceexternalip" "antrea.io/antrea/pkg/agent/controller/traceflow" + "antrea.io/antrea/pkg/agent/externalnode" "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/exporter" "antrea.io/antrea/pkg/agent/interfacestore" @@ -101,6 +102,7 @@ func run(o *Options) error { serviceInformer := informerFactory.Core().V1().Services() endpointsInformer := informerFactory.Core().V1().Endpoints() externalIPPoolInformer := crdInformerFactory.Crd().V1alpha2().ExternalIPPools() + externalNodeInformer := crdInformerFactory.Crd().V1alpha1().ExternalNodes() // Create Antrea Clientset for the given config. antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient) @@ -216,6 +218,7 @@ func run(o *Options) error { // Initialize agent and node network. agentInitializer := agent.NewInitializer( k8sClient, + crdClient, ovsBridgeClient, ofClient, routeClient, @@ -230,6 +233,7 @@ func run(o *Options) error { networkReadyCh, stopCh, o.nodeType, + o.config.Namespace, features.DefaultFeatureGate.Enabled(features.AntreaProxy), o.config.AntreaProxy.ProxyAll, connectUplinkToBridge) @@ -296,7 +300,16 @@ func run(o *Options) error { // podUpdateChannel is a channel for receiving Pod updates from CNIServer and // notifying NetworkPolicyController and EgressController to reconcile rules // related to the updated Pods. - podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) + var podUpdateChannel *channel.SubscribableChannel + // externalNodeUpdateChannel is a channel for receiving ExternalNode updates from ExternalNodeController and + // notifying NetworkPolicyController to reconcile rules related to the updated ExternalNodes. + var externalNodeUpdateChannel *channel.SubscribableChannel + if o.nodeType == config.K8sNode { + podUpdateChannel = channel.NewSubscribableChannel("PodUpdate", 100) + } else { + externalNodeUpdateChannel = channel.NewSubscribableChannel("ExternalNodeUpdate", 100) + } + // We set flow poll interval as the time interval for rule deletion in the async // rule cache, which is implemented as part of the idAllocator. This is to preserve // the rule info for populating NetworkPolicy fields in the Flow Exporter even @@ -315,6 +328,7 @@ func run(o *Options) error { ifaceStore, nodeConfig.Name, podUpdateChannel, + externalNodeUpdateChannel, groupCounters, groupIDUpdates, antreaPolicyEnabled, @@ -323,6 +337,7 @@ func run(o *Options) error { loggingEnabled, asyncRuleDeleteInterval, o.config.DNSServerOverride, + o.nodeType, v4Enabled, v6Enabled) if err != nil { @@ -386,6 +401,7 @@ func run(o *Options) error { var cniServer *cniserver.CNIServer var cniPodInfoStore cnipodcache.CNIPodInfoStore + var externalNodeController *externalnode.ExternalNodeController if o.nodeType == config.K8sNode { isChaining := false if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { @@ -415,6 +431,12 @@ func run(o *Options) error { return fmt.Errorf("error initializing CNI server: %v", err) } } + } else { + externalNodeController, err = externalnode.NewExternalNodeController(ovsBridgeClient, ofClient, externalNodeInformer, + ifaceStore, externalNodeUpdateChannel, o.config.Namespace) + if err != nil { + return fmt.Errorf("error creating ExternalNode controller: %v", err) + } } var traceflowController *traceflow.Controller @@ -507,6 +529,9 @@ func run(o *Options) error { go podUpdateChannel.Run(stopCh) go cniServer.Run(stopCh) go nodeRouteController.Run(stopCh) + } else { + go externalNodeUpdateChannel.Run(stopCh) + go externalNodeController.Run(stopCh) } if networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec && diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index 1479962b610..4e1621a404d 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -489,4 +489,7 @@ func (o *Options) setExternalNodeDefaultOptions() { o.config.EnablePrometheusMetrics = new(bool) *o.config.EnablePrometheusMetrics = false } + if o.config.Namespace == "" { + o.config.Namespace = "default" + } } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 6d1233d414e..198fc15ca28 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -37,6 +37,7 @@ import ( "antrea.io/antrea/pkg/agent/cniserver" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/controller/noderoute" + "antrea.io/antrea/pkg/agent/externalnode" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/openflow/cookie" @@ -44,6 +45,7 @@ import ( "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/agent/wireguard" + "antrea.io/antrea/pkg/client/clientset/versioned" "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/ovs/ovsctl" @@ -81,6 +83,7 @@ var otherConfigKeysForIPsecCertificates = []string{"certificate", "private_key", // Initializer knows how to setup host networking, OpenVSwitch, and Openflow. type Initializer struct { client clientset.Interface + crdClient versioned.Interface ovsBridgeClient ovsconfig.OVSBridgeClient ofClient openflow.Client routeClient route.Interface @@ -102,10 +105,12 @@ type Initializer struct { networkReadyCh chan<- struct{} stopCh <-chan struct{} nodeType config.NodeType + namespace string } func NewInitializer( k8sClient clientset.Interface, + crdClient versioned.Interface, ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, routeClient route.Interface, @@ -120,6 +125,7 @@ func NewInitializer( networkReadyCh chan<- struct{}, stopCh <-chan struct{}, nodeType config.NodeType, + namespace string, enableProxy bool, proxyAll bool, connectUplinkToBridge bool, @@ -127,6 +133,7 @@ func NewInitializer( return &Initializer{ ovsBridgeClient: ovsBridgeClient, client: k8sClient, + crdClient: crdClient, ifaceStore: ifaceStore, ofClient: ofClient, routeClient: routeClient, @@ -140,6 +147,7 @@ func NewInitializer( networkReadyCh: networkReadyCh, stopCh: stopCh, nodeType: nodeType, + namespace: namespace, enableProxy: enableProxy, proxyAll: proxyAll, connectUplinkToBridge: connectUplinkToBridge, @@ -279,9 +287,17 @@ func (i *Initializer) initInterfaceStore() error { case interfacestore.AntreaTunnel: intf = parseTunnelInterfaceFunc(port, ovsPort) case interfacestore.AntreaHost: - // Not load the host interface, because it is configured on the OVS bridge port, and we don't need a - // specific interface in the interfaceStore. - intf = nil + if port.Name == i.ovsBridge { + // Not load the host interface, because it is configured on the OVS bridge port, and we don't need a + // specific interface in the interfaceStore. + intf = nil + } else { + var err error + intf, err = externalnode.ParseHostInterfaceConfig(i.ovsBridgeClient, port, ovsPort) + if err != nil { + return err + } + } case interfacestore.AntreaContainer: // The port should be for a container interface. intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort, true) @@ -1153,6 +1169,15 @@ func (i *Initializer) initNodeLocalConfig() error { } func (i *Initializer) initVMLocalConfig(nodeName string) error { + klog.InfoS("Initialize VM config", "ExternalNode", nodeName) + wait.PollImmediateUntil(time.Minute, func() (done bool, err error) { + _, err = i.crdClient.CrdV1alpha1().ExternalNodes(i.namespace).Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return false, nil + } + return true, nil + + }, i.stopCh) i.nodeConfig = &config.NodeConfig{ Name: nodeName, Type: config.ExternalNode, diff --git a/pkg/agent/config/node_config.go b/pkg/agent/config/node_config.go index fdd3d2497ac..6c147928203 100644 --- a/pkg/agent/config/node_config.go +++ b/pkg/agent/config/node_config.go @@ -96,6 +96,8 @@ type AdapterNetConfig struct { Index int MAC net.HardwareAddr IP *net.IPNet + IPs []*net.IPNet + MTU int Gateway string DNSServers string Routes []interface{} diff --git a/pkg/agent/controller/networkpolicy/cache.go b/pkg/agent/controller/networkpolicy/cache.go index e0fc264f260..c9b0e6e94d4 100644 --- a/pkg/agent/controller/networkpolicy/cache.go +++ b/pkg/agent/controller/networkpolicy/cache.go @@ -27,12 +27,14 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/metrics" agenttypes "antrea.io/antrea/pkg/agent/types" v1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/externalnode" "antrea.io/antrea/pkg/util/k8s" ) @@ -336,7 +338,8 @@ func toServicesIndexFunc(obj interface{}) ([]string, error) { } // newRuleCache returns a new *ruleCache. -func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Subscriber, serviceGroupIDUpdate <-chan string) *ruleCache { +func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Subscriber, externalNodeUpdateSubscriber channel.Subscriber, + serviceGroupIDUpdate <-chan string, nodeType config.NodeType) *ruleCache { rules := cache.NewIndexer( ruleKeyFunc, cache.Indexers{addressGroupIndex: addressGroupIndexFunc, appliedToGroupIndex: appliedToGroupIndexFunc, policyIndex: policyIndexFunc, toServicesIndex: toServicesIndexFunc}, @@ -349,8 +352,14 @@ func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Sub dirtyRuleHandler: dirtyRuleHandler, groupIDUpdates: serviceGroupIDUpdate, } - // Subscribe Pod update events from CNIServer. - podUpdateSubscriber.Subscribe(cache.processPodUpdate) + if nodeType == config.K8sNode { + // Subscribe Pod update events from CNIServer. + podUpdateSubscriber.Subscribe(cache.processPodUpdate) + } else { + // Subscribe ExternalNode update events from ExternalNodeController + externalNodeUpdateSubscriber.Subscribe(cache.processExternalNodeUpdate) + } + go cache.processGroupIDUpdates() return cache } @@ -379,6 +388,31 @@ func (c *ruleCache) processPodUpdate(e interface{}) { } } +// processExternalNodeUpdate will be called when ExternalNodeController publishes an ExternalNode update event. +// It finds out AppliedToGroups that contains this ExternalNode converted ExternalEntity and trigger reconciling +// of related rules. +// It can enforce NetworkPolicies to ExternalEntities after ExternalEntityInterface is realised in the interface store. +func (c *ruleCache) processExternalNodeUpdate(e interface{}) { + externalNode := e.(*crdv1alpha1.ExternalNode) + eeName := externalnode.GenExternalEntityName(externalNode) + c.appliedToSetLock.RLock() + defer c.appliedToSetLock.RUnlock() + externalEntityEquals := func(ee *v1beta.ExternalEntityReference, namespace string, name string) bool { + if ee.Namespace == namespace && ee.Name == name { + return true + } + return false + } + for group, memberSet := range c.appliedToSetByGroup { + for _, member := range memberSet.Items() { + if externalEntityEquals(member.ExternalEntity, externalNode.Namespace, eeName) { + c.onAppliedToGroupUpdate(group) + break + } + } + } +} + // processGroupIDUpdates is an infinite loop that takes Service groupID // update events from the channel, finds out rules that refer this Service in // ToServices field and use dirtyRuleHandler to re-queue these rules. diff --git a/pkg/agent/controller/networkpolicy/cache_test.go b/pkg/agent/controller/networkpolicy/cache_test.go index 00c01d3ed25..9c0e0120993 100644 --- a/pkg/agent/controller/networkpolicy/cache_test.go +++ b/pkg/agent/controller/networkpolicy/cache_test.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/util/channel" @@ -278,7 +279,7 @@ func newFakeRuleCache() (*ruleCache, *dirtyRuleRecorder, *channel.SubscribableCh recorder := newDirtyRuleRecorder() podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) ch2 := make(chan string, 100) - c := newRuleCache(recorder.Record, podUpdateChannel, ch2) + c := newRuleCache(recorder.Record, podUpdateChannel, nil, ch2, config.K8sNode) return c, recorder, podUpdateChannel } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index c0c06ccfd70..1b44b638db6 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -30,6 +30,7 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/flowexporter/connections" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" @@ -114,6 +115,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, ifaceStore interfacestore.InterfaceStore, nodeName string, podUpdateSubscriber channel.Subscriber, + externalNodeUpdateSubscriber channel.Subscriber, groupCounters []proxytypes.GroupCounter, groupIDUpdates <-chan string, antreaPolicyEnabled bool, @@ -122,6 +124,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, loggingEnabled bool, asyncRuleDeleteInterval time.Duration, dnsServerOverride string, + nodeType config.NodeType, v4Enabled bool, v6Enabled bool) (*Controller, error) { idAllocator := newIDAllocator(asyncRuleDeleteInterval, dnsInterceptRuleID) @@ -144,7 +147,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, } } c.reconciler = newReconciler(ofClient, ifaceStore, idAllocator, c.fqdnController, groupCounters, v4Enabled, v6Enabled, antreaPolicyEnabled) - c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, groupIDUpdates) + c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, externalNodeUpdateSubscriber, groupIDUpdates, nodeType) if statusManagerEnabled { c.statusManager = newStatusController(antreaClientGetter, nodeName, c.ruleCache) } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index b7f54207c57..060e4d67261 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -31,6 +31,7 @@ import ( k8stesting "k8s.io/client-go/testing" "k8s.io/component-base/metrics/legacyregistry" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/metrics" "antrea.io/antrea/pkg/agent/openflow" proxytypes "antrea.io/antrea/pkg/agent/proxy/types" @@ -58,8 +59,8 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { ch2 := make(chan string, 100) groupIDAllocator := openflow.NewGroupAllocator(false) groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)} - controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, groupCounters, ch2, - true, true, true, true, testAsyncDeleteInterval, "8.8.8.8:53", true, false) + controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, nil, groupCounters, ch2, + true, true, true, true, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false) reconciler := newMockReconciler() controller.reconciler = reconciler controller.antreaPolicyLogger = nil diff --git a/pkg/agent/controller/networkpolicy/reconciler.go b/pkg/agent/controller/networkpolicy/reconciler.go index f5b062666d5..77849cf60d4 100644 --- a/pkg/agent/controller/networkpolicy/reconciler.go +++ b/pkg/agent/controller/networkpolicy/reconciler.go @@ -873,22 +873,22 @@ func (r *reconciler) GetRuleByFlowID(ruleFlowID uint32) (*types.PolicyRule, bool func (r *reconciler) getOFPorts(members v1beta2.GroupMemberSet) sets.Int32 { ofPorts := sets.NewInt32() for _, m := range members { - var entityName, ns string var ifaces []*interfacestore.InterfaceConfig + var name, ns string if m.Pod != nil { - entityName, ns = m.Pod.Name, m.Pod.Namespace - ifaces = r.ifaceStore.GetContainerInterfacesByPod(entityName, ns) + name, ns = m.Pod.Name, m.Pod.Namespace + ifaces = r.ifaceStore.GetContainerInterfacesByPod(name, ns) } else if m.ExternalEntity != nil { - entityName, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace - ifaces = r.ifaceStore.GetInterfacesByEntity(entityName, ns) + name, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace + ifaces = r.ifaceStore.GetInterfacesByEntity(name, ns) } if len(ifaces) == 0 { // This might be because the container has been deleted during realization or hasn't been set up yet. - klog.Infof("Can't find interface for %s/%s, skipping", ns, entityName) + klog.Infof("Can't find interface for %s/%s, skipping", ns, name) continue } for _, iface := range ifaces { - klog.V(2).Infof("Got OFPort %v for %s/%s", iface.OFPort, ns, entityName) + klog.V(2).Infof("Got OFPort %v for %s/%s", iface.OFPort, ns, name) ofPorts.Insert(iface.OFPort) } } @@ -898,24 +898,24 @@ func (r *reconciler) getOFPorts(members v1beta2.GroupMemberSet) sets.Int32 { func (r *reconciler) getIPs(members v1beta2.GroupMemberSet) sets.String { ips := sets.NewString() for _, m := range members { - var entityName, ns string var ifaces []*interfacestore.InterfaceConfig + var name, ns string if m.Pod != nil { - entityName, ns = m.Pod.Name, m.Pod.Namespace - ifaces = r.ifaceStore.GetContainerInterfacesByPod(entityName, ns) + name, ns = m.Pod.Name, m.Pod.Namespace + ifaces = r.ifaceStore.GetContainerInterfacesByPod(name, ns) } else if m.ExternalEntity != nil { - entityName, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace - ifaces = r.ifaceStore.GetInterfacesByEntity(entityName, ns) + name, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace + ifaces = r.ifaceStore.GetInterfacesByEntity(name, ns) } if len(ifaces) == 0 { // This might be because the container has been deleted during realization or hasn't been set up yet. - klog.Infof("Can't find interface for %s/%s, skipping", ns, entityName) + klog.Infof("Can't find interface for %s/%s, skipping", ns, name) continue } for _, iface := range ifaces { for _, ipAddr := range iface.IPs { if ipAddr != nil { - klog.V(2).Infof("Got IP %v for %s/%s", iface.IPs, ns, entityName) + klog.V(2).Infof("Got IP %v for %s/%s", iface.IPs, ns, name) ips.Insert(ipAddr.String()) } } diff --git a/pkg/agent/controller/networkpolicy/status_controller_test.go b/pkg/agent/controller/networkpolicy/status_controller_test.go index d719e8baa10..8db35fa0bd9 100644 --- a/pkg/agent/controller/networkpolicy/status_controller_test.go +++ b/pkg/agent/controller/networkpolicy/status_controller_test.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/util/channel" ) @@ -51,7 +52,7 @@ func (c *fakeNetworkPolicyControl) getNetworkPolicyStatus() *v1beta2.NetworkPoli } func newTestStatusController() (*StatusController, *ruleCache, *fakeNetworkPolicyControl) { - ruleCache := newRuleCache(func(s string) {}, channel.NewSubscribableChannel("PodUpdate", 100), make(chan string, 100)) + ruleCache := newRuleCache(func(s string) {}, channel.NewSubscribableChannel("PodUpdate", 100), nil, make(chan string, 100), config.K8sNode) statusControl := &fakeNetworkPolicyControl{} statusController := newStatusController(nil, testNode1, ruleCache) statusController.statusControlInterface = statusControl diff --git a/pkg/agent/externalnode/external_node_controller.go b/pkg/agent/externalnode/external_node_controller.go new file mode 100644 index 00000000000..5b5af8b9057 --- /dev/null +++ b/pkg/agent/externalnode/external_node_controller.go @@ -0,0 +1,753 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externalnode + +import ( + "fmt" + "net" + "reflect" + "strconv" + "strings" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" + "antrea.io/antrea/pkg/agent/util" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" + eninformer "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" + enlister "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" + binding "antrea.io/antrea/pkg/ovs/openflow" + "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/ovs/ovsctl" + "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/env" + "antrea.io/antrea/pkg/util/externalnode" + "antrea.io/antrea/pkg/util/ip" +) + +const ( + controllerName = "ExternalNodeController" + // How long to wait before retrying the processing of an ExternalNode change. + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second + // Disable resyncing. + resyncPeriod time.Duration = 0 + + ovsExternalIDUplinkName = "uplink-name" + ovsExternalIDHostIFName = "hostInterface-name" + ovsExternalIDHostIFIndex = "hostInterface-index" + ovsExternalIDUplinkPort = "uplink-port" + ovsExternalIDEntityName = "entity-name" + ovsExternalIDEntityNamespace = "entity-namespace" + ovsExternalIDIP = "ip" + eeSplitter = "," + reserveRuleSplitter = ":" +) + +var ( + keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc + splitKeyFunc = cache.SplitMetaNamespaceKey +) + +type ExternalNodeController struct { + ovsBridgeClient ovsconfig.OVSBridgeClient + ovsctlClient ovsctl.OVSCtlClient + ofClient openflow.Client + externalNodeInformer cache.SharedIndexInformer + externalNodeLister enlister.ExternalNodeNamespaceLister + externalNodeListerSynced cache.InformerSynced + queue workqueue.RateLimitingInterface + ifaceStore interfacestore.InterfaceStore + syncedExternalNodes cache.Store + // nodeUpdateNotifier is used for notifying updates of local ExternalNode to NetworkPolicyController. + nodeUpdateNotifier channel.Notifier + nodeName string + namespace string + reservedHostPorts []reserveHostPort + reservedRules []string +} + +type reserveHostPort struct { + // The protocol (TCP, UDP, or SCTP) which traffic must match. + protocol binding.Protocol + // The dst port on the given protocol. + port uint16 + // The remote IP to which is reserved. + ip net.IP + // The remote CIDR to which is reserved. + ipnet *net.IPNet + ingress bool +} + +func NewExternalNodeController(ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, externalNodeInformer eninformer.ExternalNodeInformer, + ifaceStore interfacestore.InterfaceStore, nodeUpdateNotifier channel.Notifier, namespace string) (*ExternalNodeController, error) { + c := &ExternalNodeController{ + ovsBridgeClient: ovsBridgeClient, + ovsctlClient: ovsctl.NewClient(ovsBridgeClient.GetBridgeName()), + ofClient: ofClient, + externalNodeInformer: externalNodeInformer.Informer(), + externalNodeLister: externalNodeInformer.Lister().ExternalNodes(namespace), + externalNodeListerSynced: externalNodeInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "externalNode"), + ifaceStore: ifaceStore, + syncedExternalNodes: cache.NewStore(keyFunc), + nodeUpdateNotifier: nodeUpdateNotifier, + } + nodeName, err := env.GetNodeName() + if err != nil { + return nil, err + } + c.nodeName = nodeName + c.namespace = namespace + c.externalNodeInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueueExternalNodeAdd, + UpdateFunc: c.enqueueExternalNodeUpdate, + DeleteFunc: c.enqueueExternalNodeDelete, + }, + resyncPeriod) + + return c, nil +} + +func (c *ExternalNodeController) enqueueExternalNodeAdd(obj interface{}) { + en := obj.(*v1alpha1.ExternalNode) + if en.Name != c.nodeName || en.Namespace != c.namespace { + return + } + key, _ := keyFunc(en) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode ADD event", "ExternalNode", klog.KObj(en)) +} + +func (c *ExternalNodeController) enqueueExternalNodeUpdate(oldObj interface{}, newObj interface{}) { + oldEn := oldObj.(*v1alpha1.ExternalNode) + newEn := newObj.(*v1alpha1.ExternalNode) + if newEn.Name != c.nodeName || newEn.Namespace != c.namespace { + return + } + if reflect.DeepEqual(oldEn.Spec.Interfaces, newEn.Spec.Interfaces) { + klog.InfoS("Skip enqueuing ExternalNode UPDATE event as no changes for interfaces", "ExternalNode", klog.KObj(newEn)) + return + } + key, _ := keyFunc(newEn) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode UPDATE event", "ExternalNode", klog.KObj(newEn)) +} + +func (c *ExternalNodeController) enqueueExternalNodeDelete(obj interface{}) { + en := obj.(*v1alpha1.ExternalNode) + if en.Name != c.nodeName || en.Namespace != c.namespace { + return + } + key, _ := keyFunc(en) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode DELETE event", "ExternalNode", klog.KObj(en)) +} + +// Run will create defaultWorkers workers (goroutines) which will process the ExternalNode events from the work queue. +func (c *ExternalNodeController) Run(stopCh <-chan struct{}) { + defer c.queue.ShutDown() + + klog.Infof("Starting %s", controllerName) + defer klog.Infof("Shutting down %s", controllerName) + + if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.externalNodeListerSynced) { + klog.Error("Failed to wait for syncing cache for ExternalNodes") + return + } + + if err := c.reconcile(); err != nil { + klog.Errorf("Failed to reconcile %v", err) + return + } + + go wait.Until(c.worker, time.Second, stopCh) + + <-stopCh +} + +func (c *ExternalNodeController) reconcile() error { + klog.Info("ExternalNodeController starts reconciliation") + if err := c.reconcileHostUplinkFlows(); err != nil { + return fmt.Errorf("failed to reconcile host uplink flows %v", err) + } + if err := c.reconcileExternalEntityInterfaces(); err != nil { + return fmt.Errorf("failed to reconcile ExternalEntity interfaces %v", err) + } + if err := c.reconcilePolicyBypassFlows(); err != nil { + return fmt.Errorf("failed to reconcile reserved flows %v", err) + } + klog.Info("ExternalNodeController finishes reconciliation") + return nil +} + +func (c *ExternalNodeController) reconcileHostUplinkFlows() error { + hostIfaces := c.ifaceStore.GetInterfacesByType(interfacestore.ExternalEntityInterface) + for _, hostIface := range hostIfaces { + if err := c.ofClient.InstallVMUplinkFlows(hostIface.InterfaceName, hostIface.OVSPortConfig.OFPort, hostIface.UplinkPort.OFPort); err != nil { + klog.ErrorS(err, "Failed to install openflow entries to forward packet between uplink and host interface", "hostInterface", hostIface.InterfaceName) + return err + } + klog.InfoS("Reconcile host uplink flow for ExternalEntityInterface", "ifName", hostIface.InterfaceName) + } + return nil +} + +func (c *ExternalNodeController) reconcileExternalEntityInterfaces() error { + en, err := c.externalNodeLister.Get(c.nodeName) + if err != nil { + return err + } + if err = c.addExternalNode(en); err != nil { + return err + } + eeName := externalnode.GenExternalEntityName(en) + hostIfaces := c.ifaceStore.GetInterfacesByType(interfacestore.ExternalEntityInterface) + for _, hostIface := range hostIfaces { + if hostIface.EntityName != eeName || hostIface.EntityNamespace != en.Namespace { + if err = c.deleteInterface(hostIface.InterfaceName); err != nil { + return err + } + } + } + return nil +} + +func (c *ExternalNodeController) reconcilePolicyBypassFlows() error { + if err := c.getReservedHostPorts(); err != nil { + return err + } + for _, rhp := range c.reservedHostPorts { + klog.V(2).InfoS("Installing reserved flows", "protocol", rhp.protocol, "IP", rhp.ip, "IPNet", rhp.ipnet, "port", rhp.port, "ingress", rhp.ingress) + if err := c.ofClient.InstallPolicyBypassFlows(rhp.protocol, rhp.ipnet, rhp.ip, rhp.port, rhp.ingress); err != nil { + klog.ErrorS(err, "Failed to install reserved flows", "protocol", rhp.protocol, "IP", rhp.ip, "IPNet", rhp.ipnet, "port", rhp.port, "direction", rhp.ingress) + return err + } + } + klog.InfoS("Installed reserved flows") + return nil +} + +func (c *ExternalNodeController) getReservedHostPorts() error { + for _, s := range c.reservedRules { + rule := strings.Split(s, reserveRuleSplitter) + + // parse direction. + direction := false + if rule[0] == "in" { + direction = true + } + + // parse protocol. + var proto binding.Protocol + switch rule[1] { + case "tcp": + proto = binding.ProtocolTCP + case "udp": + proto = binding.ProtocolUDP + case "icmp": + proto = binding.ProtocolICMP + default: + proto = binding.ProtocolIP + } + + // parse remote IP or CIDR. + var remoteCIDR *net.IPNet + var remoteIP net.IP + var err error + if rule[2] != "" { + if strings.Contains(rule[2], "/") { + _, remoteCIDR, err = net.ParseCIDR(rule[2]) + if err != nil { + return err + } + } else { + remoteIP = net.ParseIP(rule[2]) + } + } + + // parse port number. + var port int + if rule[3] != "" { + port, _ = strconv.Atoi(rule[3]) + } + c.reservedHostPorts = append(c.reservedHostPorts, reserveHostPort{ + ipnet: remoteCIDR, + ip: remoteIP, + port: uint16(port), + protocol: proto, + ingress: direction, + }) + } + return nil +} + +// worker is a long-running function that will continually call the processNextWorkItem function in +// order to read and process a message on the work queue. +func (c *ExternalNodeController) worker() { + for c.processNextWorkItem() { + } +} + +func (c *ExternalNodeController) processNextWorkItem() bool { + obj, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(obj) + + if key, ok := obj.(string); !ok { + c.queue.Forget(obj) + klog.Errorf("Expected string in work queue but got %#v", obj) + return true + } else if err := c.syncExternalNode(key); err == nil { + // If no error occurs we Forget this item so it does not get queued again until + // another change happens. + c.queue.Forget(key) + } else { + // Put the item back on the workqueue to handle any transient errors. + c.queue.AddRateLimited(key) + klog.Errorf("Error syncing ExternalNode %s, requeuing. Error: %v", key, err) + } + return true +} + +func (c *ExternalNodeController) syncExternalNode(key string) error { + _, name, err := splitKeyFunc(key) + if err != nil { + // This err should not occur. + return err + } + en, err := c.externalNodeLister.Get(name) + if errors.IsNotFound(err) { + return c.deleteExternalNode(key) + } + preEn, exists, _ := c.syncedExternalNodes.GetByKey(key) + if !exists { + return c.addExternalNode(en) + } else { + return c.updateExternalNode(preEn, en) + } +} + +func (c *ExternalNodeController) addExternalNode(en *v1alpha1.ExternalNode) error { + klog.InfoS("Adding ExternalNode", "ExternalNode", klog.KObj(en)) + eeName := externalnode.GenExternalEntityName(en) + ifName, err := getHostInterfaceName(en.Spec.Interfaces[0]) + if err != nil { + return err + } + if err := c.addInterface(ifName, en.Namespace, eeName, en.Spec.Interfaces[0].IPs); err != nil { + return err + } + c.syncedExternalNodes.Add(en) + // Notify the ExternalNode create event to NetworkPolicyController. + c.nodeUpdateNotifier.Notify(en) + return nil +} + +func (c *ExternalNodeController) addInterface(ifName string, eeNamespace string, eeName string, ips []string) error { + hostIface, portExists := c.ifaceStore.GetInterfaceByName(ifName) + if !portExists { + klog.InfoS("Creating OVS ports and flows for ExternalEntityInterface", "ifName", ifName, "eeName", eeName, "ip", ips) + uplinkName := genUplinkInterfaceName(ifName) + iface, err := c.createOVSPortsAndFlows(uplinkName, ifName, eeNamespace, eeName, ips) + if err != nil { + return err + } + c.ifaceStore.AddInterface(iface) + return nil + } + klog.InfoS("Updating OVS port data", "ifName", ifName, "eeName", eeName, "ip", ips) + iface, err := c.updateOVSPortsData(hostIface, eeName, ips) + if err != nil { + return err + } + c.ifaceStore.AddInterface(iface) + return nil +} + +func (c *ExternalNodeController) updateExternalNode(obj interface{}, curEn *v1alpha1.ExternalNode) error { + klog.InfoS("Updating ExternalNode", "ExternalNode", klog.KObj(curEn)) + preEn := obj.(*v1alpha1.ExternalNode) + preEEName := externalnode.GenExternalEntityName(preEn) + preIfName, err := getHostInterfaceName(preEn.Spec.Interfaces[0]) + if err != nil { + return err + } + curEEName := externalnode.GenExternalEntityName(curEn) + curIfName, err := getHostInterfaceName(curEn.Spec.Interfaces[0]) + if err != nil { + return err + } + if preIfName != curIfName { + klog.InfoS("Found interface name is changed", "preName", preIfName, "curName", curIfName) + if err = c.deleteInterface(preIfName); err != nil { + return err + } + if err = c.addInterface(curIfName, curEn.Namespace, curEEName, curEn.Spec.Interfaces[0].IPs); err != nil { + return err + } + } else if (!reflect.DeepEqual(preEn.Spec.Interfaces[0].IPs, curEn.Spec.Interfaces[0].IPs)) || (preEEName != curEEName) { + klog.InfoS("Found interface configuration is changed", "preIPs", preEn.Spec.Interfaces[0].IPs, "preEEName", preEEName, + "curIPs", curEn.Spec.Interfaces[0].IPs, "curEEName", curEEName) + if err = c.addInterface(curIfName, curEn.Namespace, curEEName, curEn.Spec.Interfaces[0].IPs); err != nil { + return err + } + } + c.syncedExternalNodes.Add(curEn) + // Notify the ExternalNode update event to NetworkPolicyController. + c.nodeUpdateNotifier.Notify(curEn) + return nil +} + +func (c *ExternalNodeController) deleteExternalNode(key string) error { + klog.InfoS("Deleting ExternalNode", "key", key) + obj, exists, _ := c.syncedExternalNodes.GetByKey(key) + if !exists { + klog.InfoS("Skipping ExternalNode deletion as it hasn't been synced", "ExternalEntityKey", key) + return nil + } + en := obj.(*v1alpha1.ExternalNode) + ifName, err := getHostInterfaceName(en.Spec.Interfaces[0]) + if err != nil { + return err + } + if err := c.deleteInterface(ifName); err != nil { + return err + } + c.syncedExternalNodes.Delete(en) + return nil +} + +func (c *ExternalNodeController) deleteInterface(ifName string) error { + hostIface, portExists := c.ifaceStore.GetInterfaceByName(ifName) + if !portExists { + klog.InfoS("Skipping deleting host interface since it doesn't exist ", "ifName", ifName) + return nil + } + klog.InfoS("Deleting interface", "ifName", ifName) + if err := c.removeOVSPortsAndFlows(hostIface); err != nil { + return err + } + c.ifaceStore.DeleteInterface(hostIface) + return nil +} + +func (c *ExternalNodeController) createOVSPortsAndFlows(uplinkName, hostIfName, eeNamespace string, eeName string, ips []string) (*interfacestore.InterfaceConfig, error) { + adapterConfig, err := util.GetUplinkConfig(hostIfName) + if err != nil { + klog.ErrorS(err, "Failed to get the configuration on the host interface", "hostInterface", hostIfName) + return nil, err + } + if err := util.RenameInterface(hostIfName, uplinkName); err != nil { + klog.ErrorS(err, "Failed to rename host interface name as the uplink name", "hostInterface", hostIfName, "uplink", uplinkName) + return nil, err + } + success := false + defer func() { + if !success { + util.RenameInterface(uplinkName, hostIfName) + } + }() + // Create uplink port on OVS. + uplinkExternalIDs := map[string]interface{}{ + interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaUplink, + } + uplinkUUID, err := c.ovsBridgeClient.CreatePort(uplinkName, uplinkName, uplinkExternalIDs) + if err != nil { + klog.ErrorS(err, "Failed to create uplink port on OVS", "uplink", uplinkName) + return nil, err + } + defer func() { + if !success { + c.ovsBridgeClient.DeletePort(uplinkUUID) + } + }() + uplinkOFPort, err := c.ovsBridgeClient.GetOFPort(uplinkName, false) + if err != nil { + klog.ErrorS(err, "Failed to get uplink ofport", "uplink", uplinkName) + return nil, err + } + klog.V(2).InfoS("Added uplink port on OVS", "port", uplinkName) + + // Create host port on OVS. + hostIfUUID, err := c.ovsBridgeClient.CreateInternalPort(hostIfName, 0, nil) + if err != nil { + klog.ErrorS(err, "Failed to create host port on OVS", "hostInterface", hostIfName) + return nil, err + } + defer func() { + if !success { + c.ovsBridgeClient.DeletePort(hostIfUUID) + } + }() + hostOFPort, err := c.ovsBridgeClient.GetOFPort(hostIfName, false) + if err != nil { + klog.ErrorS(err, "Failed to get host interface ofport", "hostInterface", uplinkName) + return nil, err + } + klog.V(2).InfoS("Added host port on OVS", "port", hostIfName) + + hostIF, err := net.InterfaceByName(hostIfName) + if err != nil { + return nil, err + } + attachInfo := getOVSAttachInfo(uplinkName, uplinkUUID, fmt.Sprintf("%d", hostIF.Index), hostIF.Name, eeName, eeNamespace, ips) + // Update OVS port external_ids with the host interface id and name. + if err := c.ovsBridgeClient.SetPortExternalIDs(hostIfName, attachInfo); err != nil { + return nil, err + } + if err := util.MoveIFConfigurations(adapterConfig.IPs, adapterConfig.Routes, adapterConfig.MAC, adapterConfig.MTU, uplinkName, hostIfName); err != nil { + klog.ErrorS(err, "Failed to move configuration from the host interface to uplink", "hostInterface", hostIfName, "uplink", uplinkName) + return nil, err + } + klog.V(2).InfoS("Moved configuration from the uplink to host port", "uplink", uplinkName, "hostInterface", hostIfName) + if err := c.ofClient.InstallVMUplinkFlows(hostIfName, hostOFPort, uplinkOFPort); err != nil { + return nil, err + } + klog.InfoS("Added uplink and host port on OVS and installed openflow entries", "uplink", uplinkName, "hostInterface", hostIfName) + success = true + ifIPs := make([]net.IP, 0) + for _, ip := range ips { + ifIPs = append(ifIPs, net.ParseIP(ip)) + } + hostIFConfig := &interfacestore.InterfaceConfig{ + Type: interfacestore.ExternalEntityInterface, + InterfaceName: hostIfName, + IPs: ifIPs, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: hostIfUUID, + OFPort: hostOFPort, + }, + EntityInterfaceConfig: &interfacestore.EntityInterfaceConfig{ + EntityName: eeName, + EntityNamespace: eeNamespace, + UplinkPort: &interfacestore.OVSPortConfig{ + PortUUID: uplinkUUID, + OFPort: uplinkOFPort, + }, + HostIfaceIndex: hostIF.Index, + }, + } + return hostIFConfig, nil +} + +func getOVSAttachInfo(uplinkName, uplinkUUID, hostIFIdx, hostPortName, entityName, entityNamespace string, ips []string) map[string]interface{} { + attachInfo := map[string]interface{}{ + interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaHost, + } + if uplinkName != "" { + attachInfo[ovsExternalIDUplinkName] = uplinkName + } + if uplinkUUID != "" { + attachInfo[ovsExternalIDUplinkPort] = uplinkUUID + } + if hostIFIdx != "" { + attachInfo[ovsExternalIDHostIFIndex] = hostIFIdx + } + if hostPortName != "" { + attachInfo[ovsExternalIDHostIFName] = hostPortName + } + if entityName != "" { + attachInfo[ovsExternalIDEntityName] = entityName + } + if entityNamespace != "" { + attachInfo[ovsExternalIDEntityNamespace] = entityNamespace + } + if len(ips) != 0 { + attachInfo[ovsExternalIDIP] = strings.Join(ips, eeSplitter) + } + + return attachInfo +} + +func (c *ExternalNodeController) updateOVSPortsData(interfaceConfig *interfacestore.InterfaceConfig, eeName string, ips []string) (*interfacestore.InterfaceConfig, error) { + portUUID := interfaceConfig.PortUUID + portName := interfaceConfig.InterfaceName + port, err := c.ovsBridgeClient.GetPortData(portUUID, portName) + if err != nil { + return nil, err + } + + attachInfo := map[string]interface{}{ + ovsExternalIDUplinkName: port.ExternalIDs[ovsExternalIDUplinkName], + ovsExternalIDHostIFIndex: port.ExternalIDs[ovsExternalIDHostIFIndex], + ovsExternalIDHostIFName: port.ExternalIDs[ovsExternalIDHostIFName], + ovsExternalIDUplinkPort: port.ExternalIDs[ovsExternalIDUplinkPort], + ovsExternalIDEntityName: eeName, + ovsExternalIDEntityNamespace: port.ExternalIDs[ovsExternalIDEntityName], + ovsExternalIDIP: strings.Join(ips, eeSplitter), + interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaHost, + } + err = c.ovsBridgeClient.SetPortExternalIDs(portName, attachInfo) + if err != nil { + return nil, err + } + ifIPs := make([]net.IP, 0) + for _, ip := range ips { + ifIPs = append(ifIPs, net.ParseIP(ip)) + } + iface := &interfacestore.InterfaceConfig{ + InterfaceName: interfaceConfig.InterfaceName, + Type: interfacestore.ExternalEntityInterface, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: interfaceConfig.PortUUID, + OFPort: interfaceConfig.OFPort, + }, + EntityInterfaceConfig: &interfacestore.EntityInterfaceConfig{ + EntityName: eeName, + EntityNamespace: interfaceConfig.EntityNamespace, + UplinkPort: &interfacestore.OVSPortConfig{ + PortUUID: interfaceConfig.UplinkPort.PortUUID, + OFPort: interfaceConfig.UplinkPort.OFPort, + }, + HostIfaceIndex: interfaceConfig.EntityInterfaceConfig.HostIfaceIndex, + }, + IPs: ifIPs, + } + return iface, nil +} + +func (c *ExternalNodeController) removeOVSPortsAndFlows(interfaceConfig *interfacestore.InterfaceConfig) error { + portUUID := interfaceConfig.PortUUID + portName := interfaceConfig.InterfaceName + if err := c.ofClient.UninstallVMUplinkFlows(portName); err != nil { + return err + } + port, err := c.ovsBridgeClient.GetPortData(portUUID, portName) + if err != nil { + if strings.Contains(err.Error(), "not found") { + klog.ErrorS(err, "OVS port is not found", "port", portName) + return nil + } + return err + } + hostIfName := port.ExternalIDs[ovsExternalIDHostIFName] + uplinkIfName := port.ExternalIDs[ovsExternalIDUplinkName] + uplinkPortID := port.ExternalIDs[ovsExternalIDUplinkPort] + adapterConfig, er := util.GetUplinkConfig(hostIfName) + if er != nil { + klog.ErrorS(err, "Failed to get the configuration on the host interface", "hostInterface", hostIfName) + return err + } + if err := c.ovsBridgeClient.DeletePort(port.UUID); err != nil { + klog.ErrorS(err, "Failed to delete port on OVS", "port", hostIfName) + return err + } + if err := c.ovsBridgeClient.DeletePort(uplinkPortID); err != nil { + return err + } + defer func() { + // Delete host interface from OVS datapath if it is existing. + // This is to resolve an issue that OVS fails to remove the interface from datapath. It might happen because the interface + // is busy when OVS tries to remove it with the OVSDB interface deletion event. + if err := c.ovsctlClient.DeleteDPInterface(hostIfName); err != nil { + klog.ErrorS(err, "Failed delete host interface from OVS datapath", "interface", hostIfName) + } + }() + + // Wait until the host interface created by OVS is removed. + pollErr := wait.PollImmediate(50*time.Millisecond, 2*time.Second, func() (bool, error) { + return !util.HostInterfaceExists(hostIfName), nil + }) + if pollErr != nil { + klog.ErrorS(pollErr, "Failed to wait for host interface deletion in 2s", "interface", hostIfName) + } + // Recover the uplink interface's name. + if err := util.RenameInterface(uplinkIfName, hostIfName); err != nil { + klog.ErrorS(err, "Failed to recover uplink name to the host interface name", "uplink", uplinkIfName, "hostInterface", hostIfName) + return err + } + // Move the IP configurations back to the host interface. + if err := util.MoveIFConfigurations(adapterConfig.IPs, adapterConfig.Routes, adapterConfig.MAC, adapterConfig.MTU, uplinkIfName, hostIfName); err != nil { + klog.ErrorS(err, "Failed to move back configuration to the host interface", "hostInterface", hostIfName) + return err + } + return nil +} + +func getHostInterfaceName(iface v1alpha1.NetworkInterface) (string, error) { + var ipFilter *ip.DualStackIPs + epIP := net.ParseIP(iface.IPs[0]) + if epIP.To4() != nil { + ipFilter = &ip.DualStackIPs{IPv4: epIP} + } else { + ipFilter = &ip.DualStackIPs{IPv6: epIP} + } + _, _, link, err := util.GetIPNetDeviceFromIP(ipFilter, sets.NewString()) + if err == nil { + klog.InfoS("Using the interface", "linkName", link.Name) + return link.Name, nil + } + return "", err + +} + +func ParseHostInterfaceConfig(ovsBridgeClient ovsconfig.OVSBridgeClient, portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) (*interfacestore.InterfaceConfig, error) { + var interfaceConfig *interfacestore.InterfaceConfig + interfaceConfig = &interfacestore.InterfaceConfig{ + InterfaceName: portData.Name, + Type: interfacestore.ExternalEntityInterface, + OVSPortConfig: portConfig, + } + var hostUplinkConfig *interfacestore.EntityInterfaceConfig + if portData.ExternalIDs != nil { + entityIPStrs := strings.Split(portData.ExternalIDs[ovsExternalIDIP], eeSplitter) + var entityIPs []net.IP + for _, ipStr := range entityIPStrs { + entityIPs = append(entityIPs, net.ParseIP(ipStr)) + } + interfaceConfig.IPs = entityIPs + uplinkName, _ := portData.ExternalIDs[ovsExternalIDUplinkName] + uplinkPortUUID, _ := portData.ExternalIDs[ovsExternalIDUplinkPort] + uplinkPortData, ovsErr := ovsBridgeClient.GetPortData(uplinkPortUUID, uplinkName) + if ovsErr != nil { + klog.Info("Failed to get uplink port data") + return nil, ovsErr + } + hostLinkStr := portData.ExternalIDs[ovsExternalIDHostIFIndex] + hostLink, err := strconv.Atoi(hostLinkStr) + if err != nil { + klog.Info("Failed to get interface name from port data") + return nil, err + } + entityName, _ := portData.ExternalIDs[ovsExternalIDEntityName] + entityNamespace, _ := portData.ExternalIDs[ovsExternalIDEntityNamespace] + hostUplinkConfig = &interfacestore.EntityInterfaceConfig{ + EntityName: entityName, + EntityNamespace: entityNamespace, + UplinkPort: &interfacestore.OVSPortConfig{ + PortUUID: uplinkPortUUID, + OFPort: uplinkPortData.OFPort, + }, + HostIfaceIndex: hostLink, + } + } + interfaceConfig.EntityInterfaceConfig = hostUplinkConfig + return interfaceConfig, nil +} + +func genUplinkInterfaceName(hostIfName string) string { + return fmt.Sprintf("phy-%s", hostIfName) +} diff --git a/pkg/agent/util/net.go b/pkg/agent/util/net.go index 0496adf8c7d..a28c491b01b 100644 --- a/pkg/agent/util/net.go +++ b/pkg/agent/util/net.go @@ -389,3 +389,19 @@ func GenerateRandomMAC() net.HardwareAddr { buf[0] |= 2 return buf } + +func GetGlobalIPNetsByName(link *net.Interface) ([]*net.IPNet, error) { + addrList, err := link.Addrs() + if err != nil { + return nil, err + } + var addrs []*net.IPNet + for _, a := range addrList { + if ipNet, ok := a.(*net.IPNet); ok { + if ipNet.IP.IsGlobalUnicast() { + addrs = append(addrs, ipNet) + } + } + } + return addrs, nil +} diff --git a/pkg/agent/util/net_linux.go b/pkg/agent/util/net_linux.go index eb25c5b737f..9c0b4d5bdc4 100644 --- a/pkg/agent/util/net_linux.go +++ b/pkg/agent/util/net_linux.go @@ -23,11 +23,15 @@ import ( "os" "os/exec" "path/filepath" + "time" "github.com/containernetworking/plugins/pkg/ip" "github.com/containernetworking/plugins/pkg/ns" "github.com/vishvananda/netlink" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/config" ) // GetNetLink returns dev link from name. @@ -237,3 +241,172 @@ func DeleteOVSPort(brName, portName string) error { cmd := exec.Command("ovs-vsctl", "--if-exists", "del-port", brName, portName) return cmd.Run() } + +func GetNetRoutesOnAdapter(linkIndex int) (string, []interface{}, error) { + link, err := netlink.LinkByIndex(linkIndex) + if err != nil { + return "", nil, err + } + rs, err := netlink.RouteList(link, netlink.FAMILY_ALL) + if err != nil { + return "", nil, err + } + var gw string + var routes []interface{} + for _, r := range rs { + if r.Gw != nil && r.Dst != nil && r.Dst.IP.IsUnspecified() { + gw = r.Gw.String() + } + routes = append(routes, r) + } + return gw, routes, nil +} + +func RenameInterface(from, to string) error { + var renameErr error + pollErr := wait.Poll(time.Millisecond*100, time.Second, func() (done bool, err error) { + renameErr = renameHostInterface(from, to) + if renameErr != nil { + klog.InfoS("Unable to rename host interface name with error, retrying", "oldName", from, "newName", to, "err", renameErr) + return false, nil + } + return true, nil + }) + if pollErr != nil { + return fmt.Errorf("failed to rename host interface name %s to %s", from, to) + } + return nil +} + +func renameHostInterface(oriName string, newName string) error { + link, err := netlink.LinkByName(oriName) + if err != nil { + return err + } + if err := netlink.LinkSetDown(link); err != nil { + return err + } + defer func() { + netlink.LinkSetUp(link) + }() + if err := netlink.LinkSetName(link, newName); err != nil { + return err + } + return nil +} + +func MoveIFConfigurations(ips []*net.IPNet, routes []interface{}, mac net.HardwareAddr, mtu int, from string, to string) error { + toIF, err := netlink.LinkByName(to) + if err != nil { + return err + } + fromExists := true + fromIF, err := netlink.LinkByName(from) + if err != nil { + if _, ok := err.(netlink.LinkNotFoundError); !ok { + return err + } + fromExists = false + } + if fromExists { + if err := netlink.LinkSetHardwareAddr(toIF, mac); err != nil { + return err + } + if err := netlink.LinkSetMTU(toIF, mtu); err != nil { + return err + } + if err := netlink.LinkSetUp(toIF); err != nil { + return err + } + if err := removeUplinkIPRoutes(fromIF); err != nil { + return err + } + } + toIndex := toIF.Attrs().Index + // Copy the uplink interface's IP to the veth interface. + if err := ConfigureLinkAddresses(toIndex, ips); err != nil { + return err + } + // Copy the uplink interface's Route to the veth interface. + if err := copyRoutes(toIF, routes); err != nil { + return err + } + return nil +} + +// removeUplinkIPRoutes removes the IP and Routes from the uplink interface. +func removeUplinkIPRoutes(uplinkIF netlink.Link) error { + addrs, err := netlink.AddrList(uplinkIF, netlink.FAMILY_ALL) + if err != nil { + return err + } + for i := range addrs { + if err := netlink.AddrDel(uplinkIF, &addrs[i]); err != nil { + return err + } + } + return removeRoutesOnLink(uplinkIF) +} + +func removeRoutesOnLink(link netlink.Link) error { + routes, err := netlink.RouteList(link, netlink.FAMILY_ALL) + if err != nil { + return err + } + for i := range routes { + if err := netlink.RouteDel(&routes[i]); err != nil { + return err + } + } + return nil +} + +func copyRoutes(toIF netlink.Link, routes []interface{}) error { + for _, r := range routes { + rt := r.(netlink.Route) + rt.LinkIndex = toIF.Attrs().Index + if err := netlink.RouteReplace(&rt); err != nil { + return err + } + } + return nil +} + +func HostInterfaceExists(ifName string) bool { + _, err := netlink.LinkByName(ifName) + if err == nil { + return true + } + if _, ok := err.(netlink.LinkNotFoundError); ok { + return false + } + klog.ErrorS(err, "Failed to find host interface", "name", ifName) + return false +} + +func GetUplinkConfig(uplinkIfName string) (*config.AdapterNetConfig, error) { + iface, err := net.InterfaceByName(uplinkIfName) + if err != nil { + klog.ErrorS(err, "Failed to get interface", "uplinkIfName", uplinkIfName) + return nil, err + } + addrs, err := GetGlobalIPNetsByName(iface) + if err != nil { + klog.ErrorS(err, "Failed to get address", "iface", iface) + return nil, err + } + gw, routes, err := GetNetRoutesOnAdapter(iface.Index) + if err != nil { + klog.ErrorS(err, "Failed to get routes", "iface.Index", iface.Index) + return nil, err + } + return &config.AdapterNetConfig{ + Name: uplinkIfName, + Index: iface.Index, + MAC: iface.HardwareAddr, + IPs: addrs, + Routes: routes, + Gateway: gw, + MTU: iface.MTU, + }, nil +} diff --git a/pkg/agent/util/net_windows.go b/pkg/agent/util/net_windows.go index ac5abc8c7a5..481e55e35ab 100644 --- a/pkg/agent/util/net_windows.go +++ b/pkg/agent/util/net_windows.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" + "antrea.io/antrea/pkg/agent/config" ps "antrea.io/antrea/pkg/agent/util/powershell" ) @@ -895,3 +896,61 @@ func ReplaceNetNeighbor(neighbor *Neighbor) error { func VirtualAdapterName(name string) string { return fmt.Sprintf("%s (%s)", ContainerVNICPrefix, name) } + +func GetUplinkConfig(uplinkIfName string) (*config.AdapterNetConfig, error) { + iface, err := net.InterfaceByName(uplinkIfName) + if err != nil { + klog.ErrorS(err, "Failed to get interface", "uplinkIfName", uplinkIfName) + return nil, err + } + addrs, err := GetGlobalIPNetsByName(iface) + if err != nil { + klog.ErrorS(err, "Failed to get address", "iface", iface) + return nil, err + } + gw, routes, err := GetNetRoutesOnAdapter(iface.Index) + if err != nil { + klog.ErrorS(err, "Failed to get routes", "iface.Index", iface.Index) + return nil, err + } + return &config.AdapterNetConfig{ + Name: uplinkIfName, + Index: iface.Index, + MAC: iface.HardwareAddr, + IPs: addrs, + Routes: routes, + Gateway: gw, + MTU: iface.MTU, + }, nil +} + +// TODO: Implement MoveIFConfigurations for windows +func MoveIFConfigurations(ips []*net.IPNet, routes []interface{}, mac net.HardwareAddr, mtu int, from string, to string) error { + return nil +} + +// TODO: Implement GetNetRoutesOnAdapter for windows +func GetNetRoutesOnAdapter(linkIndex int) (string, []interface{}, error) { + return "", nil, nil +} + +func RenameInterface(from, to string) error { + var renameErr error + pollErr := wait.Poll(time.Millisecond*100, time.Second, func() (done bool, err error) { + renameErr = renameHostInterface(from, to) + if renameErr != nil { + klog.InfoS("Unable to rename host interface name with error, retrying", "oldName", from, "newName", to, "err", renameErr) + return false, nil + } + return true, nil + }) + if pollErr != nil { + return fmt.Errorf("failed to rename host interface name %s to %s", from, to) + } + return nil +} + +// TODO: Implement renameHostInterface for windowsß +func renameHostInterface(oriName string, newName string) error { + return nil +} diff --git a/pkg/config/agent/config.go b/pkg/config/agent/config.go index 07f5d85b76e..f55ec63068f 100644 --- a/pkg/config/agent/config.go +++ b/pkg/config/agent/config.go @@ -197,6 +197,10 @@ type AgentConfig struct { // NodeType is type of the Node where Antrea Agent is running. // Defaults to "k8sNode". Valid values include "k8sNode", and "externalNode". NodeType string `yaml:"nodeType,omitempty"` + // The namespace to create the ExternalNode for the VM/BM object. + // The default value is "default". + // It is used only when NodeType is externalNode. + Namespace string `yaml:"namespace,omitempty"` } type AntreaProxyConfig struct { diff --git a/pkg/ovs/ovsctl/appctl.go b/pkg/ovs/ovsctl/appctl.go index cf1e36eb681..158b43c04a9 100644 --- a/pkg/ovs/ovsctl/appctl.go +++ b/pkg/ovs/ovsctl/appctl.go @@ -19,6 +19,7 @@ import ( "bytes" "fmt" "net" + "strconv" "strings" "k8s.io/klog/v2" @@ -227,3 +228,45 @@ func (c *ovsCtlClient) GetDPFeatures() (map[DPFeature]bool, error) { } return features, nil } + +func (c *ovsCtlClient) DeleteDPInterface(name string) error { + cmd := fmt.Sprintf("dpctl/show ovs-system") + out, err := c.runAppCtl(cmd, true) + if err == nil { + return nil + } + scanner := bufio.NewScanner(strings.NewReader(string(out))) + scanner.Split(bufio.ScanLines) + ports := make([]int, 0) + for scanner.Scan() { + line := scanner.Text() + if !strings.Contains(line, fmt.Sprintf(": %s", name)) { + continue + } + fields := strings.Split(line, ":") + if len(fields) != 2 { + klog.InfoS("Unexpected output from dpif/show ovs-system", "line", line) + continue + } + portStr := strings.Split(fields[0], " ")[1] + portNo, err := strconv.Atoi(portStr) + if err != nil { + klog.InfoS("Unable to parse port number", "port", portStr) + continue + } + ports = append(ports, portNo) + } + if len(ports) == 0 { + return nil + } + for _, port := range ports { + cmd = fmt.Sprintf("dpctl/del-if ovs-system %d", port) + _, err := c.runAppCtl(cmd, true) + if err == nil || strings.Contains(err.Error(), "No such device") { + return nil + } else { + return err + } + } + return nil +} diff --git a/pkg/ovs/ovsctl/interface.go b/pkg/ovs/ovsctl/interface.go index b6c74088621..0466ece467b 100644 --- a/pkg/ovs/ovsctl/interface.go +++ b/pkg/ovs/ovsctl/interface.go @@ -60,6 +60,8 @@ type OVSCtlClient interface { RunAppctlCmd(cmd string, needsBridge bool, args ...string) ([]byte, *ExecError) // GetDPFeatures executes "ovs-appctl dpif/show-dp-features" to check supported DP features. GetDPFeatures() (map[DPFeature]bool, error) + // DeleteDPInterface executes "vs-appctl dpctl/del-if ovs-system $name" to delete DP interface. + DeleteDPInterface(name string) error } type BadRequestError string diff --git a/pkg/ovs/ovsctl/testing/mock_ovsctl.go b/pkg/ovs/ovsctl/testing/mock_ovsctl.go index 05e0663d8a9..fc8b37f2c35 100644 --- a/pkg/ovs/ovsctl/testing/mock_ovsctl.go +++ b/pkg/ovs/ovsctl/testing/mock_ovsctl.go @@ -1,4 +1,4 @@ -// Copyright 2021 Antrea Authors +// Copyright 2022 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -48,6 +48,20 @@ func (m *MockOVSCtlClient) EXPECT() *MockOVSCtlClientMockRecorder { return m.recorder } +// DeleteDPInterface mocks base method +func (m *MockOVSCtlClient) DeleteDPInterface(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteDPInterface", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteDPInterface indicates an expected call of DeleteDPInterface +func (mr *MockOVSCtlClientMockRecorder) DeleteDPInterface(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteDPInterface", reflect.TypeOf((*MockOVSCtlClient)(nil).DeleteDPInterface), arg0) +} + // DumpFlows mocks base method func (m *MockOVSCtlClient) DumpFlows(arg0 ...string) ([]string, error) { m.ctrl.T.Helper()