From 5557fdd5c44b628cc4700c523127e0a363713346 Mon Sep 17 00:00:00 2001 From: Mengdie Song Date: Tue, 10 May 2022 17:37:10 +0800 Subject: [PATCH] [ExternalNode] Handle ExternalNode from Antrea agent side 1. Provide an example RBAC yaml file for Antrea agent running on VM with definitions of ClusterRole, ServiceAccount and ClusterRoleBinding. 2. Add ExternalNodeController to monitor ExternalNode CRUD, invoke interfaces to operate OVS and update interface store with ExternalEntitiyInterface. 3. Implement OVS interactions related to ExternalNode CRUD. 4. Add a channel for receiving ExternalNode updates from ExternalNodeController and notifying NetworkPolicyController to reconcile rules related to the updated ExternalNodes. This is to handle the case when NetworkPolicyController reconciles rules before ExternalEntitityInterface is realized in the interface store. 5. Update NetworkPolicy reconciler to invoke GetInterfacesByEntity() and GetContainerInterfacesByPod() for ExternalEntity and Pod separately. Signed-off-by: Mengdie Song Co-authored-by: Wenying Dong --- build/charts/antrea/conf/antrea-agent.conf | 3 - build/yamls/antrea-aks.yml | 7 +- build/yamls/antrea-eks.yml | 7 +- build/yamls/antrea-gke.yml | 7 +- build/yamls/antrea-ipsec.yml | 7 +- build/yamls/antrea.yml | 7 +- .../yamls/externalnode/conf/antrea-agent.conf | 4 + build/yamls/externalnode/vm-agent-rbac.yml | 101 +++ cmd/antrea-agent/agent.go | 27 +- cmd/antrea-agent/options.go | 3 + pkg/agent/agent.go | 31 +- pkg/agent/config/node_config.go | 2 + pkg/agent/controller/networkpolicy/cache.go | 40 +- .../controller/networkpolicy/cache_test.go | 3 +- .../networkpolicy/networkpolicy_controller.go | 5 +- .../networkpolicy_controller_test.go | 5 +- .../controller/networkpolicy/reconciler.go | 28 +- .../networkpolicy/status_controller_test.go | 3 +- .../externalnode/external_node_controller.go | 753 ++++++++++++++++++ pkg/agent/util/net.go | 16 + pkg/agent/util/net_linux.go | 173 ++++ pkg/agent/util/net_windows.go | 59 ++ pkg/config/agent/config.go | 4 + pkg/ovs/ovsctl/appctl.go | 43 + pkg/ovs/ovsctl/interface.go | 2 + pkg/ovs/ovsctl/testing/mock_ovsctl.go | 16 +- 26 files changed, 1301 insertions(+), 55 deletions(-) create mode 100644 build/yamls/externalnode/vm-agent-rbac.yml create mode 100644 pkg/agent/externalnode/external_node_controller.go diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index 1814cdeb34a..0527661c0eb 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -53,9 +53,6 @@ featureGates: # Enable certificated-based authentication for IPsec. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "IPsecCertAuth" "default" false) }} -# Enable running agent on an unmanaged VM/BM. -{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "ExternalNode" "default" false) }} - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: {{ .Values.ovs.bridgeName | quote }} diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index edcebc90eaa..b11e67b05f5 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -111,9 +111,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3648,7 +3645,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c008cffcecc9959862be5461e6967f4c49c0bcc06dd9926cacc0ef2a37e5111c + checksum/config: add862bdadbc70ea7e2e2c44fc60f52d9ee753d5d1bad4c6ed6fc158cfd29d12 labels: app: antrea component: antrea-agent @@ -3888,7 +3885,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c008cffcecc9959862be5461e6967f4c49c0bcc06dd9926cacc0ef2a37e5111c + checksum/config: add862bdadbc70ea7e2e2c44fc60f52d9ee753d5d1bad4c6ed6fc158cfd29d12 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 296f6a05dae..141063bdb35 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -111,9 +111,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3648,7 +3645,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c008cffcecc9959862be5461e6967f4c49c0bcc06dd9926cacc0ef2a37e5111c + checksum/config: add862bdadbc70ea7e2e2c44fc60f52d9ee753d5d1bad4c6ed6fc158cfd29d12 labels: app: antrea component: antrea-agent @@ -3890,7 +3887,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c008cffcecc9959862be5461e6967f4c49c0bcc06dd9926cacc0ef2a37e5111c + checksum/config: add862bdadbc70ea7e2e2c44fc60f52d9ee753d5d1bad4c6ed6fc158cfd29d12 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index ac0381c09d8..a60c560afb2 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -111,9 +111,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3648,7 +3645,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 7cbfbb91897e3ebdbb7914a2e6831cef47ef08e6104c00aca6984bf32bd0e022 + checksum/config: d0d5d7f0e4e920f762944fdd78e439ec68af61926a68f7f7f49f8f1ab0ac7fa6 labels: app: antrea component: antrea-agent @@ -3888,7 +3885,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 7cbfbb91897e3ebdbb7914a2e6831cef47ef08e6104c00aca6984bf32bd0e022 + checksum/config: d0d5d7f0e4e920f762944fdd78e439ec68af61926a68f7f7f49f8f1ab0ac7fa6 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 9b65fbc232e..40eb9c46fce 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -124,9 +124,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3661,7 +3658,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 32f6800db6e4eaf5f97ee1d2135b07b44a96fc83bbd8f846b7f8fb616414edc6 + checksum/config: 50764285ac4a299fe772f50f3e5f8b1ff320285f61e0e907353805fe881a06d7 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -3947,7 +3944,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 32f6800db6e4eaf5f97ee1d2135b07b44a96fc83bbd8f846b7f8fb616414edc6 + checksum/config: 50764285ac4a299fe772f50f3e5f8b1ff320285f61e0e907353805fe881a06d7 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index d3b601f57de..3dbe4e0aea6 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -111,9 +111,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3648,7 +3645,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: d729415bf3a9dafaff0d537bbc7104b4a8e672959f7804e342ef25d586747e83 + checksum/config: 18b98dbb7837e8da7cebaf3c0cb35758708fd2372bc02f73aca86e01539f93a2 labels: app: antrea component: antrea-agent @@ -3888,7 +3885,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: d729415bf3a9dafaff0d537bbc7104b4a8e672959f7804e342ef25d586747e83 + checksum/config: 18b98dbb7837e8da7cebaf3c0cb35758708fd2372bc02f73aca86e01539f93a2 labels: app: antrea component: antrea-controller diff --git a/build/yamls/externalnode/conf/antrea-agent.conf b/build/yamls/externalnode/conf/antrea-agent.conf index 55e8015bfa2..19b66ed85e4 100644 --- a/build/yamls/externalnode/conf/antrea-agent.conf +++ b/build/yamls/externalnode/conf/antrea-agent.conf @@ -32,6 +32,10 @@ featureGates: # Defaults to "k8sNode". Valid values include "k8sNode", and "externalNode". nodeType: externalNode +# Namespace is the expected namespace to create the ExternalNode for the VM/BM object. +# Defaults to "default". +#namespace: default + # The path to access the kubeconfig file used in the connection to K8s APIServer. The file contains the K8s # APIServer endpoint and the token of ServiceAccount required in the connection. clientConnection: diff --git a/build/yamls/externalnode/vm-agent-rbac.yml b/build/yamls/externalnode/vm-agent-rbac.yml new file mode 100644 index 00000000000..49d0c7ab856 --- /dev/null +++ b/build/yamls/externalnode/vm-agent-rbac.yml @@ -0,0 +1,101 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: vm-agent + namespace: test-ns # Change the namespace to where vm-agent is expected to run. +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vm-agent +rules: + # antrea-controller distributes the CA certificate as a ConfigMap named `antrea-ca` in the Antrea deployment Namespace. + # vm-agent needs to access `antrea-ca` to connect with antrea-controller. + - apiGroups: + - "" + resources: + - configmaps + resourceNames: + - antrea-ca + verbs: + - get + - watch + - list + # This is the content of built-in role kube-system/extension-apiserver-authentication-reader. + # But it doesn't have list/watch permission before K8s v1.17.0 so the extension apiserver (vm-agent) will + # have permission issue after bumping up apiserver library to a version that supports dynamic authentication. + # See https://github.com/kubernetes/kubernetes/pull/85375 + # To support K8s clusters older than v1.17.0, we grant the required permissions directly instead of relying on + # the extension-apiserver-authentication role. + - apiGroups: + - "" + resourceNames: + - extension-apiserver-authentication + resources: + - configmaps + verbs: + - get + - list + - watch + - apiGroups: + - crd.antrea.io + resourceNames: + - VM1 # Change the ExternalNode name which vm-agent is expected to update. + resources: + - antreaagentinfos + verbs: + - get + - update + - apiGroups: + - controlplane.antrea.io + resources: + - networkpolicies + - appliedtogroups + - addressgroups + verbs: + - get + - watch + - list + - apiGroups: + - controlplane.antrea.io + resources: + - nodestatssummaries + verbs: + - create + - apiGroups: + - controlplane.antrea.io + resources: + - networkpolicies/status + verbs: + - create + - get + - apiGroups: + - crd.antrea.io + resources: + - externalentities + verbs: + - get + - watch + - list + - apiGroups: + - crd.antrea.io + resources: + - externalnodes + verbs: + - get + - watch + - list +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vm-agent +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: vm-agent +subjects: + - kind: ServiceAccount + name: vm-agent + namespace: test-ns # Change the namespace to where vm-agent is expected to run. \ No newline at end of file diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 29dedcfac2d..27312f83cba 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -40,6 +40,7 @@ import ( "antrea.io/antrea/pkg/agent/controller/noderoute" "antrea.io/antrea/pkg/agent/controller/serviceexternalip" "antrea.io/antrea/pkg/agent/controller/traceflow" + "antrea.io/antrea/pkg/agent/externalnode" "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/exporter" "antrea.io/antrea/pkg/agent/interfacestore" @@ -101,6 +102,7 @@ func run(o *Options) error { serviceInformer := informerFactory.Core().V1().Services() endpointsInformer := informerFactory.Core().V1().Endpoints() externalIPPoolInformer := crdInformerFactory.Crd().V1alpha2().ExternalIPPools() + externalNodeInformer := crdInformerFactory.Crd().V1alpha1().ExternalNodes() // Create Antrea Clientset for the given config. antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient) @@ -216,6 +218,7 @@ func run(o *Options) error { // Initialize agent and node network. agentInitializer := agent.NewInitializer( k8sClient, + crdClient, ovsBridgeClient, ofClient, routeClient, @@ -230,6 +233,7 @@ func run(o *Options) error { networkReadyCh, stopCh, o.nodeType, + o.config.Namespace, features.DefaultFeatureGate.Enabled(features.AntreaProxy), o.config.AntreaProxy.ProxyAll, connectUplinkToBridge) @@ -296,7 +300,16 @@ func run(o *Options) error { // podUpdateChannel is a channel for receiving Pod updates from CNIServer and // notifying NetworkPolicyController and EgressController to reconcile rules // related to the updated Pods. - podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) + var podUpdateChannel *channel.SubscribableChannel + // externalNodeUpdateChannel is a channel for receiving ExternalNode updates from ExternalNodeController and + // notifying NetworkPolicyController to reconcile rules related to the updated ExternalNodes. + var externalNodeUpdateChannel *channel.SubscribableChannel + if o.nodeType == config.K8sNode { + podUpdateChannel = channel.NewSubscribableChannel("PodUpdate", 100) + } else { + externalNodeUpdateChannel = channel.NewSubscribableChannel("ExternalNodeUpdate", 100) + } + // We set flow poll interval as the time interval for rule deletion in the async // rule cache, which is implemented as part of the idAllocator. This is to preserve // the rule info for populating NetworkPolicy fields in the Flow Exporter even @@ -315,6 +328,7 @@ func run(o *Options) error { ifaceStore, nodeConfig.Name, podUpdateChannel, + externalNodeUpdateChannel, groupCounters, groupIDUpdates, antreaPolicyEnabled, @@ -323,6 +337,7 @@ func run(o *Options) error { loggingEnabled, asyncRuleDeleteInterval, o.config.DNSServerOverride, + o.nodeType, v4Enabled, v6Enabled) if err != nil { @@ -386,6 +401,7 @@ func run(o *Options) error { var cniServer *cniserver.CNIServer var cniPodInfoStore cnipodcache.CNIPodInfoStore + var externalNodeController *externalnode.ExternalNodeController if o.nodeType == config.K8sNode { isChaining := false if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { @@ -415,6 +431,12 @@ func run(o *Options) error { return fmt.Errorf("error initializing CNI server: %v", err) } } + } else { + externalNodeController, err = externalnode.NewExternalNodeController(ovsBridgeClient, ofClient, externalNodeInformer, + ifaceStore, externalNodeUpdateChannel, o.config.Namespace) + if err != nil { + return fmt.Errorf("error creating ExternalNode controller: %v", err) + } } var traceflowController *traceflow.Controller @@ -507,6 +529,9 @@ func run(o *Options) error { go podUpdateChannel.Run(stopCh) go cniServer.Run(stopCh) go nodeRouteController.Run(stopCh) + } else { + go externalNodeUpdateChannel.Run(stopCh) + go externalNodeController.Run(stopCh) } if networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec && diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index 1479962b610..4e1621a404d 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -489,4 +489,7 @@ func (o *Options) setExternalNodeDefaultOptions() { o.config.EnablePrometheusMetrics = new(bool) *o.config.EnablePrometheusMetrics = false } + if o.config.Namespace == "" { + o.config.Namespace = "default" + } } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 6d1233d414e..198fc15ca28 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -37,6 +37,7 @@ import ( "antrea.io/antrea/pkg/agent/cniserver" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/controller/noderoute" + "antrea.io/antrea/pkg/agent/externalnode" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/openflow/cookie" @@ -44,6 +45,7 @@ import ( "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/agent/wireguard" + "antrea.io/antrea/pkg/client/clientset/versioned" "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/ovs/ovsctl" @@ -81,6 +83,7 @@ var otherConfigKeysForIPsecCertificates = []string{"certificate", "private_key", // Initializer knows how to setup host networking, OpenVSwitch, and Openflow. type Initializer struct { client clientset.Interface + crdClient versioned.Interface ovsBridgeClient ovsconfig.OVSBridgeClient ofClient openflow.Client routeClient route.Interface @@ -102,10 +105,12 @@ type Initializer struct { networkReadyCh chan<- struct{} stopCh <-chan struct{} nodeType config.NodeType + namespace string } func NewInitializer( k8sClient clientset.Interface, + crdClient versioned.Interface, ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, routeClient route.Interface, @@ -120,6 +125,7 @@ func NewInitializer( networkReadyCh chan<- struct{}, stopCh <-chan struct{}, nodeType config.NodeType, + namespace string, enableProxy bool, proxyAll bool, connectUplinkToBridge bool, @@ -127,6 +133,7 @@ func NewInitializer( return &Initializer{ ovsBridgeClient: ovsBridgeClient, client: k8sClient, + crdClient: crdClient, ifaceStore: ifaceStore, ofClient: ofClient, routeClient: routeClient, @@ -140,6 +147,7 @@ func NewInitializer( networkReadyCh: networkReadyCh, stopCh: stopCh, nodeType: nodeType, + namespace: namespace, enableProxy: enableProxy, proxyAll: proxyAll, connectUplinkToBridge: connectUplinkToBridge, @@ -279,9 +287,17 @@ func (i *Initializer) initInterfaceStore() error { case interfacestore.AntreaTunnel: intf = parseTunnelInterfaceFunc(port, ovsPort) case interfacestore.AntreaHost: - // Not load the host interface, because it is configured on the OVS bridge port, and we don't need a - // specific interface in the interfaceStore. - intf = nil + if port.Name == i.ovsBridge { + // Not load the host interface, because it is configured on the OVS bridge port, and we don't need a + // specific interface in the interfaceStore. + intf = nil + } else { + var err error + intf, err = externalnode.ParseHostInterfaceConfig(i.ovsBridgeClient, port, ovsPort) + if err != nil { + return err + } + } case interfacestore.AntreaContainer: // The port should be for a container interface. intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort, true) @@ -1153,6 +1169,15 @@ func (i *Initializer) initNodeLocalConfig() error { } func (i *Initializer) initVMLocalConfig(nodeName string) error { + klog.InfoS("Initialize VM config", "ExternalNode", nodeName) + wait.PollImmediateUntil(time.Minute, func() (done bool, err error) { + _, err = i.crdClient.CrdV1alpha1().ExternalNodes(i.namespace).Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return false, nil + } + return true, nil + + }, i.stopCh) i.nodeConfig = &config.NodeConfig{ Name: nodeName, Type: config.ExternalNode, diff --git a/pkg/agent/config/node_config.go b/pkg/agent/config/node_config.go index fdd3d2497ac..6c147928203 100644 --- a/pkg/agent/config/node_config.go +++ b/pkg/agent/config/node_config.go @@ -96,6 +96,8 @@ type AdapterNetConfig struct { Index int MAC net.HardwareAddr IP *net.IPNet + IPs []*net.IPNet + MTU int Gateway string DNSServers string Routes []interface{} diff --git a/pkg/agent/controller/networkpolicy/cache.go b/pkg/agent/controller/networkpolicy/cache.go index e0fc264f260..c9b0e6e94d4 100644 --- a/pkg/agent/controller/networkpolicy/cache.go +++ b/pkg/agent/controller/networkpolicy/cache.go @@ -27,12 +27,14 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/metrics" agenttypes "antrea.io/antrea/pkg/agent/types" v1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/externalnode" "antrea.io/antrea/pkg/util/k8s" ) @@ -336,7 +338,8 @@ func toServicesIndexFunc(obj interface{}) ([]string, error) { } // newRuleCache returns a new *ruleCache. -func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Subscriber, serviceGroupIDUpdate <-chan string) *ruleCache { +func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Subscriber, externalNodeUpdateSubscriber channel.Subscriber, + serviceGroupIDUpdate <-chan string, nodeType config.NodeType) *ruleCache { rules := cache.NewIndexer( ruleKeyFunc, cache.Indexers{addressGroupIndex: addressGroupIndexFunc, appliedToGroupIndex: appliedToGroupIndexFunc, policyIndex: policyIndexFunc, toServicesIndex: toServicesIndexFunc}, @@ -349,8 +352,14 @@ func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Sub dirtyRuleHandler: dirtyRuleHandler, groupIDUpdates: serviceGroupIDUpdate, } - // Subscribe Pod update events from CNIServer. - podUpdateSubscriber.Subscribe(cache.processPodUpdate) + if nodeType == config.K8sNode { + // Subscribe Pod update events from CNIServer. + podUpdateSubscriber.Subscribe(cache.processPodUpdate) + } else { + // Subscribe ExternalNode update events from ExternalNodeController + externalNodeUpdateSubscriber.Subscribe(cache.processExternalNodeUpdate) + } + go cache.processGroupIDUpdates() return cache } @@ -379,6 +388,31 @@ func (c *ruleCache) processPodUpdate(e interface{}) { } } +// processExternalNodeUpdate will be called when ExternalNodeController publishes an ExternalNode update event. +// It finds out AppliedToGroups that contains this ExternalNode converted ExternalEntity and trigger reconciling +// of related rules. +// It can enforce NetworkPolicies to ExternalEntities after ExternalEntityInterface is realised in the interface store. +func (c *ruleCache) processExternalNodeUpdate(e interface{}) { + externalNode := e.(*crdv1alpha1.ExternalNode) + eeName := externalnode.GenExternalEntityName(externalNode) + c.appliedToSetLock.RLock() + defer c.appliedToSetLock.RUnlock() + externalEntityEquals := func(ee *v1beta.ExternalEntityReference, namespace string, name string) bool { + if ee.Namespace == namespace && ee.Name == name { + return true + } + return false + } + for group, memberSet := range c.appliedToSetByGroup { + for _, member := range memberSet.Items() { + if externalEntityEquals(member.ExternalEntity, externalNode.Namespace, eeName) { + c.onAppliedToGroupUpdate(group) + break + } + } + } +} + // processGroupIDUpdates is an infinite loop that takes Service groupID // update events from the channel, finds out rules that refer this Service in // ToServices field and use dirtyRuleHandler to re-queue these rules. diff --git a/pkg/agent/controller/networkpolicy/cache_test.go b/pkg/agent/controller/networkpolicy/cache_test.go index 00c01d3ed25..9c0e0120993 100644 --- a/pkg/agent/controller/networkpolicy/cache_test.go +++ b/pkg/agent/controller/networkpolicy/cache_test.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/util/channel" @@ -278,7 +279,7 @@ func newFakeRuleCache() (*ruleCache, *dirtyRuleRecorder, *channel.SubscribableCh recorder := newDirtyRuleRecorder() podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) ch2 := make(chan string, 100) - c := newRuleCache(recorder.Record, podUpdateChannel, ch2) + c := newRuleCache(recorder.Record, podUpdateChannel, nil, ch2, config.K8sNode) return c, recorder, podUpdateChannel } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index c0c06ccfd70..1b44b638db6 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -30,6 +30,7 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/flowexporter/connections" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" @@ -114,6 +115,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, ifaceStore interfacestore.InterfaceStore, nodeName string, podUpdateSubscriber channel.Subscriber, + externalNodeUpdateSubscriber channel.Subscriber, groupCounters []proxytypes.GroupCounter, groupIDUpdates <-chan string, antreaPolicyEnabled bool, @@ -122,6 +124,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, loggingEnabled bool, asyncRuleDeleteInterval time.Duration, dnsServerOverride string, + nodeType config.NodeType, v4Enabled bool, v6Enabled bool) (*Controller, error) { idAllocator := newIDAllocator(asyncRuleDeleteInterval, dnsInterceptRuleID) @@ -144,7 +147,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, } } c.reconciler = newReconciler(ofClient, ifaceStore, idAllocator, c.fqdnController, groupCounters, v4Enabled, v6Enabled, antreaPolicyEnabled) - c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, groupIDUpdates) + c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, externalNodeUpdateSubscriber, groupIDUpdates, nodeType) if statusManagerEnabled { c.statusManager = newStatusController(antreaClientGetter, nodeName, c.ruleCache) } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index b7f54207c57..060e4d67261 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -31,6 +31,7 @@ import ( k8stesting "k8s.io/client-go/testing" "k8s.io/component-base/metrics/legacyregistry" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/metrics" "antrea.io/antrea/pkg/agent/openflow" proxytypes "antrea.io/antrea/pkg/agent/proxy/types" @@ -58,8 +59,8 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { ch2 := make(chan string, 100) groupIDAllocator := openflow.NewGroupAllocator(false) groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)} - controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, groupCounters, ch2, - true, true, true, true, testAsyncDeleteInterval, "8.8.8.8:53", true, false) + controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, nil, groupCounters, ch2, + true, true, true, true, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false) reconciler := newMockReconciler() controller.reconciler = reconciler controller.antreaPolicyLogger = nil diff --git a/pkg/agent/controller/networkpolicy/reconciler.go b/pkg/agent/controller/networkpolicy/reconciler.go index f5b062666d5..77849cf60d4 100644 --- a/pkg/agent/controller/networkpolicy/reconciler.go +++ b/pkg/agent/controller/networkpolicy/reconciler.go @@ -873,22 +873,22 @@ func (r *reconciler) GetRuleByFlowID(ruleFlowID uint32) (*types.PolicyRule, bool func (r *reconciler) getOFPorts(members v1beta2.GroupMemberSet) sets.Int32 { ofPorts := sets.NewInt32() for _, m := range members { - var entityName, ns string var ifaces []*interfacestore.InterfaceConfig + var name, ns string if m.Pod != nil { - entityName, ns = m.Pod.Name, m.Pod.Namespace - ifaces = r.ifaceStore.GetContainerInterfacesByPod(entityName, ns) + name, ns = m.Pod.Name, m.Pod.Namespace + ifaces = r.ifaceStore.GetContainerInterfacesByPod(name, ns) } else if m.ExternalEntity != nil { - entityName, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace - ifaces = r.ifaceStore.GetInterfacesByEntity(entityName, ns) + name, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace + ifaces = r.ifaceStore.GetInterfacesByEntity(name, ns) } if len(ifaces) == 0 { // This might be because the container has been deleted during realization or hasn't been set up yet. - klog.Infof("Can't find interface for %s/%s, skipping", ns, entityName) + klog.Infof("Can't find interface for %s/%s, skipping", ns, name) continue } for _, iface := range ifaces { - klog.V(2).Infof("Got OFPort %v for %s/%s", iface.OFPort, ns, entityName) + klog.V(2).Infof("Got OFPort %v for %s/%s", iface.OFPort, ns, name) ofPorts.Insert(iface.OFPort) } } @@ -898,24 +898,24 @@ func (r *reconciler) getOFPorts(members v1beta2.GroupMemberSet) sets.Int32 { func (r *reconciler) getIPs(members v1beta2.GroupMemberSet) sets.String { ips := sets.NewString() for _, m := range members { - var entityName, ns string var ifaces []*interfacestore.InterfaceConfig + var name, ns string if m.Pod != nil { - entityName, ns = m.Pod.Name, m.Pod.Namespace - ifaces = r.ifaceStore.GetContainerInterfacesByPod(entityName, ns) + name, ns = m.Pod.Name, m.Pod.Namespace + ifaces = r.ifaceStore.GetContainerInterfacesByPod(name, ns) } else if m.ExternalEntity != nil { - entityName, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace - ifaces = r.ifaceStore.GetInterfacesByEntity(entityName, ns) + name, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace + ifaces = r.ifaceStore.GetInterfacesByEntity(name, ns) } if len(ifaces) == 0 { // This might be because the container has been deleted during realization or hasn't been set up yet. - klog.Infof("Can't find interface for %s/%s, skipping", ns, entityName) + klog.Infof("Can't find interface for %s/%s, skipping", ns, name) continue } for _, iface := range ifaces { for _, ipAddr := range iface.IPs { if ipAddr != nil { - klog.V(2).Infof("Got IP %v for %s/%s", iface.IPs, ns, entityName) + klog.V(2).Infof("Got IP %v for %s/%s", iface.IPs, ns, name) ips.Insert(ipAddr.String()) } } diff --git a/pkg/agent/controller/networkpolicy/status_controller_test.go b/pkg/agent/controller/networkpolicy/status_controller_test.go index d719e8baa10..8db35fa0bd9 100644 --- a/pkg/agent/controller/networkpolicy/status_controller_test.go +++ b/pkg/agent/controller/networkpolicy/status_controller_test.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/util/channel" ) @@ -51,7 +52,7 @@ func (c *fakeNetworkPolicyControl) getNetworkPolicyStatus() *v1beta2.NetworkPoli } func newTestStatusController() (*StatusController, *ruleCache, *fakeNetworkPolicyControl) { - ruleCache := newRuleCache(func(s string) {}, channel.NewSubscribableChannel("PodUpdate", 100), make(chan string, 100)) + ruleCache := newRuleCache(func(s string) {}, channel.NewSubscribableChannel("PodUpdate", 100), nil, make(chan string, 100), config.K8sNode) statusControl := &fakeNetworkPolicyControl{} statusController := newStatusController(nil, testNode1, ruleCache) statusController.statusControlInterface = statusControl diff --git a/pkg/agent/externalnode/external_node_controller.go b/pkg/agent/externalnode/external_node_controller.go new file mode 100644 index 00000000000..5b5af8b9057 --- /dev/null +++ b/pkg/agent/externalnode/external_node_controller.go @@ -0,0 +1,753 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externalnode + +import ( + "fmt" + "net" + "reflect" + "strconv" + "strings" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" + "antrea.io/antrea/pkg/agent/util" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" + eninformer "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" + enlister "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" + binding "antrea.io/antrea/pkg/ovs/openflow" + "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/ovs/ovsctl" + "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/env" + "antrea.io/antrea/pkg/util/externalnode" + "antrea.io/antrea/pkg/util/ip" +) + +const ( + controllerName = "ExternalNodeController" + // How long to wait before retrying the processing of an ExternalNode change. + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second + // Disable resyncing. + resyncPeriod time.Duration = 0 + + ovsExternalIDUplinkName = "uplink-name" + ovsExternalIDHostIFName = "hostInterface-name" + ovsExternalIDHostIFIndex = "hostInterface-index" + ovsExternalIDUplinkPort = "uplink-port" + ovsExternalIDEntityName = "entity-name" + ovsExternalIDEntityNamespace = "entity-namespace" + ovsExternalIDIP = "ip" + eeSplitter = "," + reserveRuleSplitter = ":" +) + +var ( + keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc + splitKeyFunc = cache.SplitMetaNamespaceKey +) + +type ExternalNodeController struct { + ovsBridgeClient ovsconfig.OVSBridgeClient + ovsctlClient ovsctl.OVSCtlClient + ofClient openflow.Client + externalNodeInformer cache.SharedIndexInformer + externalNodeLister enlister.ExternalNodeNamespaceLister + externalNodeListerSynced cache.InformerSynced + queue workqueue.RateLimitingInterface + ifaceStore interfacestore.InterfaceStore + syncedExternalNodes cache.Store + // nodeUpdateNotifier is used for notifying updates of local ExternalNode to NetworkPolicyController. + nodeUpdateNotifier channel.Notifier + nodeName string + namespace string + reservedHostPorts []reserveHostPort + reservedRules []string +} + +type reserveHostPort struct { + // The protocol (TCP, UDP, or SCTP) which traffic must match. + protocol binding.Protocol + // The dst port on the given protocol. + port uint16 + // The remote IP to which is reserved. + ip net.IP + // The remote CIDR to which is reserved. + ipnet *net.IPNet + ingress bool +} + +func NewExternalNodeController(ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, externalNodeInformer eninformer.ExternalNodeInformer, + ifaceStore interfacestore.InterfaceStore, nodeUpdateNotifier channel.Notifier, namespace string) (*ExternalNodeController, error) { + c := &ExternalNodeController{ + ovsBridgeClient: ovsBridgeClient, + ovsctlClient: ovsctl.NewClient(ovsBridgeClient.GetBridgeName()), + ofClient: ofClient, + externalNodeInformer: externalNodeInformer.Informer(), + externalNodeLister: externalNodeInformer.Lister().ExternalNodes(namespace), + externalNodeListerSynced: externalNodeInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "externalNode"), + ifaceStore: ifaceStore, + syncedExternalNodes: cache.NewStore(keyFunc), + nodeUpdateNotifier: nodeUpdateNotifier, + } + nodeName, err := env.GetNodeName() + if err != nil { + return nil, err + } + c.nodeName = nodeName + c.namespace = namespace + c.externalNodeInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueueExternalNodeAdd, + UpdateFunc: c.enqueueExternalNodeUpdate, + DeleteFunc: c.enqueueExternalNodeDelete, + }, + resyncPeriod) + + return c, nil +} + +func (c *ExternalNodeController) enqueueExternalNodeAdd(obj interface{}) { + en := obj.(*v1alpha1.ExternalNode) + if en.Name != c.nodeName || en.Namespace != c.namespace { + return + } + key, _ := keyFunc(en) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode ADD event", "ExternalNode", klog.KObj(en)) +} + +func (c *ExternalNodeController) enqueueExternalNodeUpdate(oldObj interface{}, newObj interface{}) { + oldEn := oldObj.(*v1alpha1.ExternalNode) + newEn := newObj.(*v1alpha1.ExternalNode) + if newEn.Name != c.nodeName || newEn.Namespace != c.namespace { + return + } + if reflect.DeepEqual(oldEn.Spec.Interfaces, newEn.Spec.Interfaces) { + klog.InfoS("Skip enqueuing ExternalNode UPDATE event as no changes for interfaces", "ExternalNode", klog.KObj(newEn)) + return + } + key, _ := keyFunc(newEn) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode UPDATE event", "ExternalNode", klog.KObj(newEn)) +} + +func (c *ExternalNodeController) enqueueExternalNodeDelete(obj interface{}) { + en := obj.(*v1alpha1.ExternalNode) + if en.Name != c.nodeName || en.Namespace != c.namespace { + return + } + key, _ := keyFunc(en) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode DELETE event", "ExternalNode", klog.KObj(en)) +} + +// Run will create defaultWorkers workers (goroutines) which will process the ExternalNode events from the work queue. +func (c *ExternalNodeController) Run(stopCh <-chan struct{}) { + defer c.queue.ShutDown() + + klog.Infof("Starting %s", controllerName) + defer klog.Infof("Shutting down %s", controllerName) + + if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.externalNodeListerSynced) { + klog.Error("Failed to wait for syncing cache for ExternalNodes") + return + } + + if err := c.reconcile(); err != nil { + klog.Errorf("Failed to reconcile %v", err) + return + } + + go wait.Until(c.worker, time.Second, stopCh) + + <-stopCh +} + +func (c *ExternalNodeController) reconcile() error { + klog.Info("ExternalNodeController starts reconciliation") + if err := c.reconcileHostUplinkFlows(); err != nil { + return fmt.Errorf("failed to reconcile host uplink flows %v", err) + } + if err := c.reconcileExternalEntityInterfaces(); err != nil { + return fmt.Errorf("failed to reconcile ExternalEntity interfaces %v", err) + } + if err := c.reconcilePolicyBypassFlows(); err != nil { + return fmt.Errorf("failed to reconcile reserved flows %v", err) + } + klog.Info("ExternalNodeController finishes reconciliation") + return nil +} + +func (c *ExternalNodeController) reconcileHostUplinkFlows() error { + hostIfaces := c.ifaceStore.GetInterfacesByType(interfacestore.ExternalEntityInterface) + for _, hostIface := range hostIfaces { + if err := c.ofClient.InstallVMUplinkFlows(hostIface.InterfaceName, hostIface.OVSPortConfig.OFPort, hostIface.UplinkPort.OFPort); err != nil { + klog.ErrorS(err, "Failed to install openflow entries to forward packet between uplink and host interface", "hostInterface", hostIface.InterfaceName) + return err + } + klog.InfoS("Reconcile host uplink flow for ExternalEntityInterface", "ifName", hostIface.InterfaceName) + } + return nil +} + +func (c *ExternalNodeController) reconcileExternalEntityInterfaces() error { + en, err := c.externalNodeLister.Get(c.nodeName) + if err != nil { + return err + } + if err = c.addExternalNode(en); err != nil { + return err + } + eeName := externalnode.GenExternalEntityName(en) + hostIfaces := c.ifaceStore.GetInterfacesByType(interfacestore.ExternalEntityInterface) + for _, hostIface := range hostIfaces { + if hostIface.EntityName != eeName || hostIface.EntityNamespace != en.Namespace { + if err = c.deleteInterface(hostIface.InterfaceName); err != nil { + return err + } + } + } + return nil +} + +func (c *ExternalNodeController) reconcilePolicyBypassFlows() error { + if err := c.getReservedHostPorts(); err != nil { + return err + } + for _, rhp := range c.reservedHostPorts { + klog.V(2).InfoS("Installing reserved flows", "protocol", rhp.protocol, "IP", rhp.ip, "IPNet", rhp.ipnet, "port", rhp.port, "ingress", rhp.ingress) + if err := c.ofClient.InstallPolicyBypassFlows(rhp.protocol, rhp.ipnet, rhp.ip, rhp.port, rhp.ingress); err != nil { + klog.ErrorS(err, "Failed to install reserved flows", "protocol", rhp.protocol, "IP", rhp.ip, "IPNet", rhp.ipnet, "port", rhp.port, "direction", rhp.ingress) + return err + } + } + klog.InfoS("Installed reserved flows") + return nil +} + +func (c *ExternalNodeController) getReservedHostPorts() error { + for _, s := range c.reservedRules { + rule := strings.Split(s, reserveRuleSplitter) + + // parse direction. + direction := false + if rule[0] == "in" { + direction = true + } + + // parse protocol. + var proto binding.Protocol + switch rule[1] { + case "tcp": + proto = binding.ProtocolTCP + case "udp": + proto = binding.ProtocolUDP + case "icmp": + proto = binding.ProtocolICMP + default: + proto = binding.ProtocolIP + } + + // parse remote IP or CIDR. + var remoteCIDR *net.IPNet + var remoteIP net.IP + var err error + if rule[2] != "" { + if strings.Contains(rule[2], "/") { + _, remoteCIDR, err = net.ParseCIDR(rule[2]) + if err != nil { + return err + } + } else { + remoteIP = net.ParseIP(rule[2]) + } + } + + // parse port number. + var port int + if rule[3] != "" { + port, _ = strconv.Atoi(rule[3]) + } + c.reservedHostPorts = append(c.reservedHostPorts, reserveHostPort{ + ipnet: remoteCIDR, + ip: remoteIP, + port: uint16(port), + protocol: proto, + ingress: direction, + }) + } + return nil +} + +// worker is a long-running function that will continually call the processNextWorkItem function in +// order to read and process a message on the work queue. +func (c *ExternalNodeController) worker() { + for c.processNextWorkItem() { + } +} + +func (c *ExternalNodeController) processNextWorkItem() bool { + obj, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(obj) + + if key, ok := obj.(string); !ok { + c.queue.Forget(obj) + klog.Errorf("Expected string in work queue but got %#v", obj) + return true + } else if err := c.syncExternalNode(key); err == nil { + // If no error occurs we Forget this item so it does not get queued again until + // another change happens. + c.queue.Forget(key) + } else { + // Put the item back on the workqueue to handle any transient errors. + c.queue.AddRateLimited(key) + klog.Errorf("Error syncing ExternalNode %s, requeuing. Error: %v", key, err) + } + return true +} + +func (c *ExternalNodeController) syncExternalNode(key string) error { + _, name, err := splitKeyFunc(key) + if err != nil { + // This err should not occur. + return err + } + en, err := c.externalNodeLister.Get(name) + if errors.IsNotFound(err) { + return c.deleteExternalNode(key) + } + preEn, exists, _ := c.syncedExternalNodes.GetByKey(key) + if !exists { + return c.addExternalNode(en) + } else { + return c.updateExternalNode(preEn, en) + } +} + +func (c *ExternalNodeController) addExternalNode(en *v1alpha1.ExternalNode) error { + klog.InfoS("Adding ExternalNode", "ExternalNode", klog.KObj(en)) + eeName := externalnode.GenExternalEntityName(en) + ifName, err := getHostInterfaceName(en.Spec.Interfaces[0]) + if err != nil { + return err + } + if err := c.addInterface(ifName, en.Namespace, eeName, en.Spec.Interfaces[0].IPs); err != nil { + return err + } + c.syncedExternalNodes.Add(en) + // Notify the ExternalNode create event to NetworkPolicyController. + c.nodeUpdateNotifier.Notify(en) + return nil +} + +func (c *ExternalNodeController) addInterface(ifName string, eeNamespace string, eeName string, ips []string) error { + hostIface, portExists := c.ifaceStore.GetInterfaceByName(ifName) + if !portExists { + klog.InfoS("Creating OVS ports and flows for ExternalEntityInterface", "ifName", ifName, "eeName", eeName, "ip", ips) + uplinkName := genUplinkInterfaceName(ifName) + iface, err := c.createOVSPortsAndFlows(uplinkName, ifName, eeNamespace, eeName, ips) + if err != nil { + return err + } + c.ifaceStore.AddInterface(iface) + return nil + } + klog.InfoS("Updating OVS port data", "ifName", ifName, "eeName", eeName, "ip", ips) + iface, err := c.updateOVSPortsData(hostIface, eeName, ips) + if err != nil { + return err + } + c.ifaceStore.AddInterface(iface) + return nil +} + +func (c *ExternalNodeController) updateExternalNode(obj interface{}, curEn *v1alpha1.ExternalNode) error { + klog.InfoS("Updating ExternalNode", "ExternalNode", klog.KObj(curEn)) + preEn := obj.(*v1alpha1.ExternalNode) + preEEName := externalnode.GenExternalEntityName(preEn) + preIfName, err := getHostInterfaceName(preEn.Spec.Interfaces[0]) + if err != nil { + return err + } + curEEName := externalnode.GenExternalEntityName(curEn) + curIfName, err := getHostInterfaceName(curEn.Spec.Interfaces[0]) + if err != nil { + return err + } + if preIfName != curIfName { + klog.InfoS("Found interface name is changed", "preName", preIfName, "curName", curIfName) + if err = c.deleteInterface(preIfName); err != nil { + return err + } + if err = c.addInterface(curIfName, curEn.Namespace, curEEName, curEn.Spec.Interfaces[0].IPs); err != nil { + return err + } + } else if (!reflect.DeepEqual(preEn.Spec.Interfaces[0].IPs, curEn.Spec.Interfaces[0].IPs)) || (preEEName != curEEName) { + klog.InfoS("Found interface configuration is changed", "preIPs", preEn.Spec.Interfaces[0].IPs, "preEEName", preEEName, + "curIPs", curEn.Spec.Interfaces[0].IPs, "curEEName", curEEName) + if err = c.addInterface(curIfName, curEn.Namespace, curEEName, curEn.Spec.Interfaces[0].IPs); err != nil { + return err + } + } + c.syncedExternalNodes.Add(curEn) + // Notify the ExternalNode update event to NetworkPolicyController. + c.nodeUpdateNotifier.Notify(curEn) + return nil +} + +func (c *ExternalNodeController) deleteExternalNode(key string) error { + klog.InfoS("Deleting ExternalNode", "key", key) + obj, exists, _ := c.syncedExternalNodes.GetByKey(key) + if !exists { + klog.InfoS("Skipping ExternalNode deletion as it hasn't been synced", "ExternalEntityKey", key) + return nil + } + en := obj.(*v1alpha1.ExternalNode) + ifName, err := getHostInterfaceName(en.Spec.Interfaces[0]) + if err != nil { + return err + } + if err := c.deleteInterface(ifName); err != nil { + return err + } + c.syncedExternalNodes.Delete(en) + return nil +} + +func (c *ExternalNodeController) deleteInterface(ifName string) error { + hostIface, portExists := c.ifaceStore.GetInterfaceByName(ifName) + if !portExists { + klog.InfoS("Skipping deleting host interface since it doesn't exist ", "ifName", ifName) + return nil + } + klog.InfoS("Deleting interface", "ifName", ifName) + if err := c.removeOVSPortsAndFlows(hostIface); err != nil { + return err + } + c.ifaceStore.DeleteInterface(hostIface) + return nil +} + +func (c *ExternalNodeController) createOVSPortsAndFlows(uplinkName, hostIfName, eeNamespace string, eeName string, ips []string) (*interfacestore.InterfaceConfig, error) { + adapterConfig, err := util.GetUplinkConfig(hostIfName) + if err != nil { + klog.ErrorS(err, "Failed to get the configuration on the host interface", "hostInterface", hostIfName) + return nil, err + } + if err := util.RenameInterface(hostIfName, uplinkName); err != nil { + klog.ErrorS(err, "Failed to rename host interface name as the uplink name", "hostInterface", hostIfName, "uplink", uplinkName) + return nil, err + } + success := false + defer func() { + if !success { + util.RenameInterface(uplinkName, hostIfName) + } + }() + // Create uplink port on OVS. + uplinkExternalIDs := map[string]interface{}{ + interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaUplink, + } + uplinkUUID, err := c.ovsBridgeClient.CreatePort(uplinkName, uplinkName, uplinkExternalIDs) + if err != nil { + klog.ErrorS(err, "Failed to create uplink port on OVS", "uplink", uplinkName) + return nil, err + } + defer func() { + if !success { + c.ovsBridgeClient.DeletePort(uplinkUUID) + } + }() + uplinkOFPort, err := c.ovsBridgeClient.GetOFPort(uplinkName, false) + if err != nil { + klog.ErrorS(err, "Failed to get uplink ofport", "uplink", uplinkName) + return nil, err + } + klog.V(2).InfoS("Added uplink port on OVS", "port", uplinkName) + + // Create host port on OVS. + hostIfUUID, err := c.ovsBridgeClient.CreateInternalPort(hostIfName, 0, nil) + if err != nil { + klog.ErrorS(err, "Failed to create host port on OVS", "hostInterface", hostIfName) + return nil, err + } + defer func() { + if !success { + c.ovsBridgeClient.DeletePort(hostIfUUID) + } + }() + hostOFPort, err := c.ovsBridgeClient.GetOFPort(hostIfName, false) + if err != nil { + klog.ErrorS(err, "Failed to get host interface ofport", "hostInterface", uplinkName) + return nil, err + } + klog.V(2).InfoS("Added host port on OVS", "port", hostIfName) + + hostIF, err := net.InterfaceByName(hostIfName) + if err != nil { + return nil, err + } + attachInfo := getOVSAttachInfo(uplinkName, uplinkUUID, fmt.Sprintf("%d", hostIF.Index), hostIF.Name, eeName, eeNamespace, ips) + // Update OVS port external_ids with the host interface id and name. + if err := c.ovsBridgeClient.SetPortExternalIDs(hostIfName, attachInfo); err != nil { + return nil, err + } + if err := util.MoveIFConfigurations(adapterConfig.IPs, adapterConfig.Routes, adapterConfig.MAC, adapterConfig.MTU, uplinkName, hostIfName); err != nil { + klog.ErrorS(err, "Failed to move configuration from the host interface to uplink", "hostInterface", hostIfName, "uplink", uplinkName) + return nil, err + } + klog.V(2).InfoS("Moved configuration from the uplink to host port", "uplink", uplinkName, "hostInterface", hostIfName) + if err := c.ofClient.InstallVMUplinkFlows(hostIfName, hostOFPort, uplinkOFPort); err != nil { + return nil, err + } + klog.InfoS("Added uplink and host port on OVS and installed openflow entries", "uplink", uplinkName, "hostInterface", hostIfName) + success = true + ifIPs := make([]net.IP, 0) + for _, ip := range ips { + ifIPs = append(ifIPs, net.ParseIP(ip)) + } + hostIFConfig := &interfacestore.InterfaceConfig{ + Type: interfacestore.ExternalEntityInterface, + InterfaceName: hostIfName, + IPs: ifIPs, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: hostIfUUID, + OFPort: hostOFPort, + }, + EntityInterfaceConfig: &interfacestore.EntityInterfaceConfig{ + EntityName: eeName, + EntityNamespace: eeNamespace, + UplinkPort: &interfacestore.OVSPortConfig{ + PortUUID: uplinkUUID, + OFPort: uplinkOFPort, + }, + HostIfaceIndex: hostIF.Index, + }, + } + return hostIFConfig, nil +} + +func getOVSAttachInfo(uplinkName, uplinkUUID, hostIFIdx, hostPortName, entityName, entityNamespace string, ips []string) map[string]interface{} { + attachInfo := map[string]interface{}{ + interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaHost, + } + if uplinkName != "" { + attachInfo[ovsExternalIDUplinkName] = uplinkName + } + if uplinkUUID != "" { + attachInfo[ovsExternalIDUplinkPort] = uplinkUUID + } + if hostIFIdx != "" { + attachInfo[ovsExternalIDHostIFIndex] = hostIFIdx + } + if hostPortName != "" { + attachInfo[ovsExternalIDHostIFName] = hostPortName + } + if entityName != "" { + attachInfo[ovsExternalIDEntityName] = entityName + } + if entityNamespace != "" { + attachInfo[ovsExternalIDEntityNamespace] = entityNamespace + } + if len(ips) != 0 { + attachInfo[ovsExternalIDIP] = strings.Join(ips, eeSplitter) + } + + return attachInfo +} + +func (c *ExternalNodeController) updateOVSPortsData(interfaceConfig *interfacestore.InterfaceConfig, eeName string, ips []string) (*interfacestore.InterfaceConfig, error) { + portUUID := interfaceConfig.PortUUID + portName := interfaceConfig.InterfaceName + port, err := c.ovsBridgeClient.GetPortData(portUUID, portName) + if err != nil { + return nil, err + } + + attachInfo := map[string]interface{}{ + ovsExternalIDUplinkName: port.ExternalIDs[ovsExternalIDUplinkName], + ovsExternalIDHostIFIndex: port.ExternalIDs[ovsExternalIDHostIFIndex], + ovsExternalIDHostIFName: port.ExternalIDs[ovsExternalIDHostIFName], + ovsExternalIDUplinkPort: port.ExternalIDs[ovsExternalIDUplinkPort], + ovsExternalIDEntityName: eeName, + ovsExternalIDEntityNamespace: port.ExternalIDs[ovsExternalIDEntityName], + ovsExternalIDIP: strings.Join(ips, eeSplitter), + interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaHost, + } + err = c.ovsBridgeClient.SetPortExternalIDs(portName, attachInfo) + if err != nil { + return nil, err + } + ifIPs := make([]net.IP, 0) + for _, ip := range ips { + ifIPs = append(ifIPs, net.ParseIP(ip)) + } + iface := &interfacestore.InterfaceConfig{ + InterfaceName: interfaceConfig.InterfaceName, + Type: interfacestore.ExternalEntityInterface, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: interfaceConfig.PortUUID, + OFPort: interfaceConfig.OFPort, + }, + EntityInterfaceConfig: &interfacestore.EntityInterfaceConfig{ + EntityName: eeName, + EntityNamespace: interfaceConfig.EntityNamespace, + UplinkPort: &interfacestore.OVSPortConfig{ + PortUUID: interfaceConfig.UplinkPort.PortUUID, + OFPort: interfaceConfig.UplinkPort.OFPort, + }, + HostIfaceIndex: interfaceConfig.EntityInterfaceConfig.HostIfaceIndex, + }, + IPs: ifIPs, + } + return iface, nil +} + +func (c *ExternalNodeController) removeOVSPortsAndFlows(interfaceConfig *interfacestore.InterfaceConfig) error { + portUUID := interfaceConfig.PortUUID + portName := interfaceConfig.InterfaceName + if err := c.ofClient.UninstallVMUplinkFlows(portName); err != nil { + return err + } + port, err := c.ovsBridgeClient.GetPortData(portUUID, portName) + if err != nil { + if strings.Contains(err.Error(), "not found") { + klog.ErrorS(err, "OVS port is not found", "port", portName) + return nil + } + return err + } + hostIfName := port.ExternalIDs[ovsExternalIDHostIFName] + uplinkIfName := port.ExternalIDs[ovsExternalIDUplinkName] + uplinkPortID := port.ExternalIDs[ovsExternalIDUplinkPort] + adapterConfig, er := util.GetUplinkConfig(hostIfName) + if er != nil { + klog.ErrorS(err, "Failed to get the configuration on the host interface", "hostInterface", hostIfName) + return err + } + if err := c.ovsBridgeClient.DeletePort(port.UUID); err != nil { + klog.ErrorS(err, "Failed to delete port on OVS", "port", hostIfName) + return err + } + if err := c.ovsBridgeClient.DeletePort(uplinkPortID); err != nil { + return err + } + defer func() { + // Delete host interface from OVS datapath if it is existing. + // This is to resolve an issue that OVS fails to remove the interface from datapath. It might happen because the interface + // is busy when OVS tries to remove it with the OVSDB interface deletion event. + if err := c.ovsctlClient.DeleteDPInterface(hostIfName); err != nil { + klog.ErrorS(err, "Failed delete host interface from OVS datapath", "interface", hostIfName) + } + }() + + // Wait until the host interface created by OVS is removed. + pollErr := wait.PollImmediate(50*time.Millisecond, 2*time.Second, func() (bool, error) { + return !util.HostInterfaceExists(hostIfName), nil + }) + if pollErr != nil { + klog.ErrorS(pollErr, "Failed to wait for host interface deletion in 2s", "interface", hostIfName) + } + // Recover the uplink interface's name. + if err := util.RenameInterface(uplinkIfName, hostIfName); err != nil { + klog.ErrorS(err, "Failed to recover uplink name to the host interface name", "uplink", uplinkIfName, "hostInterface", hostIfName) + return err + } + // Move the IP configurations back to the host interface. + if err := util.MoveIFConfigurations(adapterConfig.IPs, adapterConfig.Routes, adapterConfig.MAC, adapterConfig.MTU, uplinkIfName, hostIfName); err != nil { + klog.ErrorS(err, "Failed to move back configuration to the host interface", "hostInterface", hostIfName) + return err + } + return nil +} + +func getHostInterfaceName(iface v1alpha1.NetworkInterface) (string, error) { + var ipFilter *ip.DualStackIPs + epIP := net.ParseIP(iface.IPs[0]) + if epIP.To4() != nil { + ipFilter = &ip.DualStackIPs{IPv4: epIP} + } else { + ipFilter = &ip.DualStackIPs{IPv6: epIP} + } + _, _, link, err := util.GetIPNetDeviceFromIP(ipFilter, sets.NewString()) + if err == nil { + klog.InfoS("Using the interface", "linkName", link.Name) + return link.Name, nil + } + return "", err + +} + +func ParseHostInterfaceConfig(ovsBridgeClient ovsconfig.OVSBridgeClient, portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) (*interfacestore.InterfaceConfig, error) { + var interfaceConfig *interfacestore.InterfaceConfig + interfaceConfig = &interfacestore.InterfaceConfig{ + InterfaceName: portData.Name, + Type: interfacestore.ExternalEntityInterface, + OVSPortConfig: portConfig, + } + var hostUplinkConfig *interfacestore.EntityInterfaceConfig + if portData.ExternalIDs != nil { + entityIPStrs := strings.Split(portData.ExternalIDs[ovsExternalIDIP], eeSplitter) + var entityIPs []net.IP + for _, ipStr := range entityIPStrs { + entityIPs = append(entityIPs, net.ParseIP(ipStr)) + } + interfaceConfig.IPs = entityIPs + uplinkName, _ := portData.ExternalIDs[ovsExternalIDUplinkName] + uplinkPortUUID, _ := portData.ExternalIDs[ovsExternalIDUplinkPort] + uplinkPortData, ovsErr := ovsBridgeClient.GetPortData(uplinkPortUUID, uplinkName) + if ovsErr != nil { + klog.Info("Failed to get uplink port data") + return nil, ovsErr + } + hostLinkStr := portData.ExternalIDs[ovsExternalIDHostIFIndex] + hostLink, err := strconv.Atoi(hostLinkStr) + if err != nil { + klog.Info("Failed to get interface name from port data") + return nil, err + } + entityName, _ := portData.ExternalIDs[ovsExternalIDEntityName] + entityNamespace, _ := portData.ExternalIDs[ovsExternalIDEntityNamespace] + hostUplinkConfig = &interfacestore.EntityInterfaceConfig{ + EntityName: entityName, + EntityNamespace: entityNamespace, + UplinkPort: &interfacestore.OVSPortConfig{ + PortUUID: uplinkPortUUID, + OFPort: uplinkPortData.OFPort, + }, + HostIfaceIndex: hostLink, + } + } + interfaceConfig.EntityInterfaceConfig = hostUplinkConfig + return interfaceConfig, nil +} + +func genUplinkInterfaceName(hostIfName string) string { + return fmt.Sprintf("phy-%s", hostIfName) +} diff --git a/pkg/agent/util/net.go b/pkg/agent/util/net.go index 0496adf8c7d..a28c491b01b 100644 --- a/pkg/agent/util/net.go +++ b/pkg/agent/util/net.go @@ -389,3 +389,19 @@ func GenerateRandomMAC() net.HardwareAddr { buf[0] |= 2 return buf } + +func GetGlobalIPNetsByName(link *net.Interface) ([]*net.IPNet, error) { + addrList, err := link.Addrs() + if err != nil { + return nil, err + } + var addrs []*net.IPNet + for _, a := range addrList { + if ipNet, ok := a.(*net.IPNet); ok { + if ipNet.IP.IsGlobalUnicast() { + addrs = append(addrs, ipNet) + } + } + } + return addrs, nil +} diff --git a/pkg/agent/util/net_linux.go b/pkg/agent/util/net_linux.go index eb25c5b737f..9c0b4d5bdc4 100644 --- a/pkg/agent/util/net_linux.go +++ b/pkg/agent/util/net_linux.go @@ -23,11 +23,15 @@ import ( "os" "os/exec" "path/filepath" + "time" "github.com/containernetworking/plugins/pkg/ip" "github.com/containernetworking/plugins/pkg/ns" "github.com/vishvananda/netlink" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/config" ) // GetNetLink returns dev link from name. @@ -237,3 +241,172 @@ func DeleteOVSPort(brName, portName string) error { cmd := exec.Command("ovs-vsctl", "--if-exists", "del-port", brName, portName) return cmd.Run() } + +func GetNetRoutesOnAdapter(linkIndex int) (string, []interface{}, error) { + link, err := netlink.LinkByIndex(linkIndex) + if err != nil { + return "", nil, err + } + rs, err := netlink.RouteList(link, netlink.FAMILY_ALL) + if err != nil { + return "", nil, err + } + var gw string + var routes []interface{} + for _, r := range rs { + if r.Gw != nil && r.Dst != nil && r.Dst.IP.IsUnspecified() { + gw = r.Gw.String() + } + routes = append(routes, r) + } + return gw, routes, nil +} + +func RenameInterface(from, to string) error { + var renameErr error + pollErr := wait.Poll(time.Millisecond*100, time.Second, func() (done bool, err error) { + renameErr = renameHostInterface(from, to) + if renameErr != nil { + klog.InfoS("Unable to rename host interface name with error, retrying", "oldName", from, "newName", to, "err", renameErr) + return false, nil + } + return true, nil + }) + if pollErr != nil { + return fmt.Errorf("failed to rename host interface name %s to %s", from, to) + } + return nil +} + +func renameHostInterface(oriName string, newName string) error { + link, err := netlink.LinkByName(oriName) + if err != nil { + return err + } + if err := netlink.LinkSetDown(link); err != nil { + return err + } + defer func() { + netlink.LinkSetUp(link) + }() + if err := netlink.LinkSetName(link, newName); err != nil { + return err + } + return nil +} + +func MoveIFConfigurations(ips []*net.IPNet, routes []interface{}, mac net.HardwareAddr, mtu int, from string, to string) error { + toIF, err := netlink.LinkByName(to) + if err != nil { + return err + } + fromExists := true + fromIF, err := netlink.LinkByName(from) + if err != nil { + if _, ok := err.(netlink.LinkNotFoundError); !ok { + return err + } + fromExists = false + } + if fromExists { + if err := netlink.LinkSetHardwareAddr(toIF, mac); err != nil { + return err + } + if err := netlink.LinkSetMTU(toIF, mtu); err != nil { + return err + } + if err := netlink.LinkSetUp(toIF); err != nil { + return err + } + if err := removeUplinkIPRoutes(fromIF); err != nil { + return err + } + } + toIndex := toIF.Attrs().Index + // Copy the uplink interface's IP to the veth interface. + if err := ConfigureLinkAddresses(toIndex, ips); err != nil { + return err + } + // Copy the uplink interface's Route to the veth interface. + if err := copyRoutes(toIF, routes); err != nil { + return err + } + return nil +} + +// removeUplinkIPRoutes removes the IP and Routes from the uplink interface. +func removeUplinkIPRoutes(uplinkIF netlink.Link) error { + addrs, err := netlink.AddrList(uplinkIF, netlink.FAMILY_ALL) + if err != nil { + return err + } + for i := range addrs { + if err := netlink.AddrDel(uplinkIF, &addrs[i]); err != nil { + return err + } + } + return removeRoutesOnLink(uplinkIF) +} + +func removeRoutesOnLink(link netlink.Link) error { + routes, err := netlink.RouteList(link, netlink.FAMILY_ALL) + if err != nil { + return err + } + for i := range routes { + if err := netlink.RouteDel(&routes[i]); err != nil { + return err + } + } + return nil +} + +func copyRoutes(toIF netlink.Link, routes []interface{}) error { + for _, r := range routes { + rt := r.(netlink.Route) + rt.LinkIndex = toIF.Attrs().Index + if err := netlink.RouteReplace(&rt); err != nil { + return err + } + } + return nil +} + +func HostInterfaceExists(ifName string) bool { + _, err := netlink.LinkByName(ifName) + if err == nil { + return true + } + if _, ok := err.(netlink.LinkNotFoundError); ok { + return false + } + klog.ErrorS(err, "Failed to find host interface", "name", ifName) + return false +} + +func GetUplinkConfig(uplinkIfName string) (*config.AdapterNetConfig, error) { + iface, err := net.InterfaceByName(uplinkIfName) + if err != nil { + klog.ErrorS(err, "Failed to get interface", "uplinkIfName", uplinkIfName) + return nil, err + } + addrs, err := GetGlobalIPNetsByName(iface) + if err != nil { + klog.ErrorS(err, "Failed to get address", "iface", iface) + return nil, err + } + gw, routes, err := GetNetRoutesOnAdapter(iface.Index) + if err != nil { + klog.ErrorS(err, "Failed to get routes", "iface.Index", iface.Index) + return nil, err + } + return &config.AdapterNetConfig{ + Name: uplinkIfName, + Index: iface.Index, + MAC: iface.HardwareAddr, + IPs: addrs, + Routes: routes, + Gateway: gw, + MTU: iface.MTU, + }, nil +} diff --git a/pkg/agent/util/net_windows.go b/pkg/agent/util/net_windows.go index ac5abc8c7a5..481e55e35ab 100644 --- a/pkg/agent/util/net_windows.go +++ b/pkg/agent/util/net_windows.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" + "antrea.io/antrea/pkg/agent/config" ps "antrea.io/antrea/pkg/agent/util/powershell" ) @@ -895,3 +896,61 @@ func ReplaceNetNeighbor(neighbor *Neighbor) error { func VirtualAdapterName(name string) string { return fmt.Sprintf("%s (%s)", ContainerVNICPrefix, name) } + +func GetUplinkConfig(uplinkIfName string) (*config.AdapterNetConfig, error) { + iface, err := net.InterfaceByName(uplinkIfName) + if err != nil { + klog.ErrorS(err, "Failed to get interface", "uplinkIfName", uplinkIfName) + return nil, err + } + addrs, err := GetGlobalIPNetsByName(iface) + if err != nil { + klog.ErrorS(err, "Failed to get address", "iface", iface) + return nil, err + } + gw, routes, err := GetNetRoutesOnAdapter(iface.Index) + if err != nil { + klog.ErrorS(err, "Failed to get routes", "iface.Index", iface.Index) + return nil, err + } + return &config.AdapterNetConfig{ + Name: uplinkIfName, + Index: iface.Index, + MAC: iface.HardwareAddr, + IPs: addrs, + Routes: routes, + Gateway: gw, + MTU: iface.MTU, + }, nil +} + +// TODO: Implement MoveIFConfigurations for windows +func MoveIFConfigurations(ips []*net.IPNet, routes []interface{}, mac net.HardwareAddr, mtu int, from string, to string) error { + return nil +} + +// TODO: Implement GetNetRoutesOnAdapter for windows +func GetNetRoutesOnAdapter(linkIndex int) (string, []interface{}, error) { + return "", nil, nil +} + +func RenameInterface(from, to string) error { + var renameErr error + pollErr := wait.Poll(time.Millisecond*100, time.Second, func() (done bool, err error) { + renameErr = renameHostInterface(from, to) + if renameErr != nil { + klog.InfoS("Unable to rename host interface name with error, retrying", "oldName", from, "newName", to, "err", renameErr) + return false, nil + } + return true, nil + }) + if pollErr != nil { + return fmt.Errorf("failed to rename host interface name %s to %s", from, to) + } + return nil +} + +// TODO: Implement renameHostInterface for windowsß +func renameHostInterface(oriName string, newName string) error { + return nil +} diff --git a/pkg/config/agent/config.go b/pkg/config/agent/config.go index 07f5d85b76e..f55ec63068f 100644 --- a/pkg/config/agent/config.go +++ b/pkg/config/agent/config.go @@ -197,6 +197,10 @@ type AgentConfig struct { // NodeType is type of the Node where Antrea Agent is running. // Defaults to "k8sNode". Valid values include "k8sNode", and "externalNode". NodeType string `yaml:"nodeType,omitempty"` + // The namespace to create the ExternalNode for the VM/BM object. + // The default value is "default". + // It is used only when NodeType is externalNode. + Namespace string `yaml:"namespace,omitempty"` } type AntreaProxyConfig struct { diff --git a/pkg/ovs/ovsctl/appctl.go b/pkg/ovs/ovsctl/appctl.go index cf1e36eb681..158b43c04a9 100644 --- a/pkg/ovs/ovsctl/appctl.go +++ b/pkg/ovs/ovsctl/appctl.go @@ -19,6 +19,7 @@ import ( "bytes" "fmt" "net" + "strconv" "strings" "k8s.io/klog/v2" @@ -227,3 +228,45 @@ func (c *ovsCtlClient) GetDPFeatures() (map[DPFeature]bool, error) { } return features, nil } + +func (c *ovsCtlClient) DeleteDPInterface(name string) error { + cmd := fmt.Sprintf("dpctl/show ovs-system") + out, err := c.runAppCtl(cmd, true) + if err == nil { + return nil + } + scanner := bufio.NewScanner(strings.NewReader(string(out))) + scanner.Split(bufio.ScanLines) + ports := make([]int, 0) + for scanner.Scan() { + line := scanner.Text() + if !strings.Contains(line, fmt.Sprintf(": %s", name)) { + continue + } + fields := strings.Split(line, ":") + if len(fields) != 2 { + klog.InfoS("Unexpected output from dpif/show ovs-system", "line", line) + continue + } + portStr := strings.Split(fields[0], " ")[1] + portNo, err := strconv.Atoi(portStr) + if err != nil { + klog.InfoS("Unable to parse port number", "port", portStr) + continue + } + ports = append(ports, portNo) + } + if len(ports) == 0 { + return nil + } + for _, port := range ports { + cmd = fmt.Sprintf("dpctl/del-if ovs-system %d", port) + _, err := c.runAppCtl(cmd, true) + if err == nil || strings.Contains(err.Error(), "No such device") { + return nil + } else { + return err + } + } + return nil +} diff --git a/pkg/ovs/ovsctl/interface.go b/pkg/ovs/ovsctl/interface.go index b6c74088621..0466ece467b 100644 --- a/pkg/ovs/ovsctl/interface.go +++ b/pkg/ovs/ovsctl/interface.go @@ -60,6 +60,8 @@ type OVSCtlClient interface { RunAppctlCmd(cmd string, needsBridge bool, args ...string) ([]byte, *ExecError) // GetDPFeatures executes "ovs-appctl dpif/show-dp-features" to check supported DP features. GetDPFeatures() (map[DPFeature]bool, error) + // DeleteDPInterface executes "vs-appctl dpctl/del-if ovs-system $name" to delete DP interface. + DeleteDPInterface(name string) error } type BadRequestError string diff --git a/pkg/ovs/ovsctl/testing/mock_ovsctl.go b/pkg/ovs/ovsctl/testing/mock_ovsctl.go index 05e0663d8a9..fc8b37f2c35 100644 --- a/pkg/ovs/ovsctl/testing/mock_ovsctl.go +++ b/pkg/ovs/ovsctl/testing/mock_ovsctl.go @@ -1,4 +1,4 @@ -// Copyright 2021 Antrea Authors +// Copyright 2022 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -48,6 +48,20 @@ func (m *MockOVSCtlClient) EXPECT() *MockOVSCtlClientMockRecorder { return m.recorder } +// DeleteDPInterface mocks base method +func (m *MockOVSCtlClient) DeleteDPInterface(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteDPInterface", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteDPInterface indicates an expected call of DeleteDPInterface +func (mr *MockOVSCtlClientMockRecorder) DeleteDPInterface(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteDPInterface", reflect.TypeOf((*MockOVSCtlClient)(nil).DeleteDPInterface), arg0) +} + // DumpFlows mocks base method func (m *MockOVSCtlClient) DumpFlows(arg0 ...string) ([]string, error) { m.ctrl.T.Helper()