From ba37b8c1fab602376225fadc092ef4c2ce52e4e3 Mon Sep 17 00:00:00 2001 From: Mengdie Song Date: Tue, 2 Aug 2022 17:49:12 +0800 Subject: [PATCH] [ExternalNode] Handle ExternalNode from Antrea agent side (#3799) 1. Provide an example RBAC yaml file for Antrea agent running on VM with definitions of ClusterRole, ServiceAccount and ClusterRoleBinding. 2. Add ExternalNodeController to monitor ExternalNode CRUD, invoke interfaces to operate OVS and update interface store with ExternalEntityInterface. 3. Implement OVS interactions related to ExternalNode CRUD. 4. Add a channel for receiving ExternalEntity updates from ExternalNodeController and notifying NetworkPolicyController to reconcile rules related to the updated ExternalEntities. This is to handle the case when NetworkPolicyController reconciles rules before ExternalEntityInterface is realized in the interface store. 5. Support configuring policy bypass rules to skip ANP check. Signed-off-by: Mengdie Song Co-authored-by: Wenying Dong --- build/charts/antrea/conf/antrea-agent.conf | 3 - build/yamls/antrea-aks.yml | 7 +- build/yamls/antrea-eks.yml | 7 +- build/yamls/antrea-gke.yml | 7 +- build/yamls/antrea-ipsec.yml | 7 +- build/yamls/antrea.yml | 7 +- .../yamls/externalnode/conf/antrea-agent.conf | 16 + build/yamls/externalnode/vm-agent-rbac.yml | 112 +++ cmd/antrea-agent/agent.go | 50 +- cmd/antrea-agent/options.go | 32 + pkg/agent/agent.go | 42 +- pkg/agent/agent_linux.go | 2 +- pkg/agent/agent_windows.go | 2 +- pkg/agent/config/node_config.go | 3 +- pkg/agent/controller/networkpolicy/cache.go | 34 +- .../controller/networkpolicy/cache_test.go | 3 +- .../networkpolicy/networkpolicy_controller.go | 8 +- .../networkpolicy_controller_test.go | 2 +- pkg/agent/controller/networkpolicy/reject.go | 21 +- .../networkpolicy/status_controller_test.go | 3 +- .../externalnode/external_node_controller.go | 682 ++++++++++++++++++ .../external_node_controller_linux.go | 59 ++ .../external_node_controller_windows.go | 21 + pkg/agent/interfacestore/types.go | 2 - pkg/agent/openflow/client.go | 2 +- .../openflow/externalnode_connectivity.go | 24 +- pkg/agent/openflow/testing/mock_openflow.go | 8 +- pkg/agent/util/net.go | 14 + pkg/agent/util/net_linux.go | 84 ++- pkg/agent/util/net_windows.go | 10 + pkg/config/agent/config.go | 25 + .../networkpolicy/networkpolicy_controller.go | 14 +- pkg/ovs/ovsctl/appctl.go | 36 + pkg/ovs/ovsctl/interface.go | 2 + pkg/ovs/ovsctl/testing/mock_ovsctl.go | 16 +- test/integration/agent/openflow_test.go | 8 +- 36 files changed, 1287 insertions(+), 88 deletions(-) create mode 100644 build/yamls/externalnode/vm-agent-rbac.yml create mode 100644 pkg/agent/externalnode/external_node_controller.go create mode 100644 pkg/agent/externalnode/external_node_controller_linux.go create mode 100644 pkg/agent/externalnode/external_node_controller_windows.go diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index d3046345bde..ed3f4dbd93c 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -67,9 +67,6 @@ featureGates: # Enable certificated-based authentication for IPsec. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "IPsecCertAuth" "default" false) }} -# Enable running agent on an unmanaged VM/BM. -{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "ExternalNode" "default" false) }} - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: {{ .Values.ovs.bridgeName | quote }} diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 4cdb3d11368..32ad10dfe00 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -2686,9 +2686,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3787,7 +3784,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f + checksum/config: affad240c8b2b8575f7a93c12a08b7cf72aca2d978bda83d73d0120f45212877 labels: app: antrea component: antrea-agent @@ -4028,7 +4025,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f + checksum/config: affad240c8b2b8575f7a93c12a08b7cf72aca2d978bda83d73d0120f45212877 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index b3f487f37bf..0911527cd5d 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -2686,9 +2686,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3787,7 +3784,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f + checksum/config: affad240c8b2b8575f7a93c12a08b7cf72aca2d978bda83d73d0120f45212877 labels: app: antrea component: antrea-agent @@ -4030,7 +4027,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f + checksum/config: affad240c8b2b8575f7a93c12a08b7cf72aca2d978bda83d73d0120f45212877 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index f46e83a00c8..f4e96e9c2c7 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -2686,9 +2686,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3787,7 +3784,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 741b313c6ab0ed98e7d994985861722f503a93529f90a5141b8a6e0c124d8904 + checksum/config: 3677c7d305f558cd78b34ee0a71f786b0c53a3f19ad9eaa5eabf09aa2590164e labels: app: antrea component: antrea-agent @@ -4027,7 +4024,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 741b313c6ab0ed98e7d994985861722f503a93529f90a5141b8a6e0c124d8904 + checksum/config: 3677c7d305f558cd78b34ee0a71f786b0c53a3f19ad9eaa5eabf09aa2590164e labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 1fc7f11a751..9cbf3c21581 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -2699,9 +2699,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3800,7 +3797,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c74f29ceba3905db50cef22ee46f73e1c101c108a70e70918b17413c174081e8 + checksum/config: 5e82c60c904bef6feb9d344aa5f283e3bb516250ffc1239aa1f46b99b07d5221 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -4086,7 +4083,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c74f29ceba3905db50cef22ee46f73e1c101c108a70e70918b17413c174081e8 + checksum/config: 5e82c60c904bef6feb9d344aa5f283e3bb516250ffc1239aa1f46b99b07d5221 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index f4b89a02978..da595cf130e 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -2686,9 +2686,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3787,7 +3784,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 056a828ba2400e94aa9c43e6e74a4b007027bf6b95a68e1e15f34cd6ffeb2baa + checksum/config: f72e5c9f6a652693755b716796b9aa0d4b6e2f0c7b64fd2333197af96862c8b5 labels: app: antrea component: antrea-agent @@ -4027,7 +4024,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 056a828ba2400e94aa9c43e6e74a4b007027bf6b95a68e1e15f34cd6ffeb2baa + checksum/config: f72e5c9f6a652693755b716796b9aa0d4b6e2f0c7b64fd2333197af96862c8b5 labels: app: antrea component: antrea-controller diff --git a/build/yamls/externalnode/conf/antrea-agent.conf b/build/yamls/externalnode/conf/antrea-agent.conf index 55e8015bfa2..1fda25a069f 100644 --- a/build/yamls/externalnode/conf/antrea-agent.conf +++ b/build/yamls/externalnode/conf/antrea-agent.conf @@ -32,6 +32,22 @@ featureGates: # Defaults to "k8sNode". Valid values include "k8sNode", and "externalNode". nodeType: externalNode +externalNode: + # The expected Namespace in which the ExternalNode is created. + # Defaults to "default". + #externalNodeNamespace: default + + # The policyBypassRules describes the traffic that is expected to bypass NetworkPolicy rules. + # Each rule contains the following four attributes: + # direction (ingress|egress), protocol(tcp/udp/icmp/ip), remote CIDR, dst port (ICMP doesn't require). + # Here is an example: + # - direction: ingress + # protocol: tcp + # cidr: 1.1.1.1/32 + # port: 22 + # It is used only when NodeType is externalNode. + #policyBypassRules: [] + # The path to access the kubeconfig file used in the connection to K8s APIServer. The file contains the K8s # APIServer endpoint and the token of ServiceAccount required in the connection. clientConnection: diff --git a/build/yamls/externalnode/vm-agent-rbac.yml b/build/yamls/externalnode/vm-agent-rbac.yml new file mode 100644 index 00000000000..084cb742dc7 --- /dev/null +++ b/build/yamls/externalnode/vm-agent-rbac.yml @@ -0,0 +1,112 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: vm-agent + namespace: vm-ns # Change the Namespace to where vm-agent is expected to run. +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vm-agent +rules: + # antrea-controller distributes the CA certificate as a ConfigMap named `antrea-ca` in the Antrea deployment Namespace. + # vm-agent needs to access `antrea-ca` to connect with antrea-controller. + - apiGroups: + - "" + resources: + - configmaps + resourceNames: + - antrea-ca + verbs: + - get + - watch + - list + # This is the content of built-in role kube-system/extension-apiserver-authentication-reader. + # But it doesn't have list/watch permission before K8s v1.17.0 so the extension apiserver (vm-agent) will + # have permission issue after bumping up apiserver library to a version that supports dynamic authentication. + # See https://github.com/kubernetes/kubernetes/pull/85375 + # To support K8s clusters older than v1.17.0, we grant the required permissions directly instead of relying on + # the extension-apiserver-authentication role. + - apiGroups: + - "" + resourceNames: + - extension-apiserver-authentication + resources: + - configmaps + verbs: + - get + - list + - watch + - apiGroups: + - crd.antrea.io + resources: + - antreaagentinfos + verbs: + - get + - update + - apiGroups: + - controlplane.antrea.io + resources: + - networkpolicies + - appliedtogroups + - addressgroups + verbs: + - get + - watch + - list + - apiGroups: + - controlplane.antrea.io + resources: + - nodestatssummaries + verbs: + - create + - apiGroups: + - controlplane.antrea.io + resources: + - networkpolicies/status + verbs: + - create + - get +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vm-agent +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: vm-agent +subjects: + - kind: ServiceAccount + name: vm-agent + namespace: vm-ns # Change the Namespace to where vm-agent is expected to run. +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vm-agent + namespace: vm-ns # Change the Namespace to where vm-agent is expected to run. +rules: + - apiGroups: + - crd.antrea.io + resources: + - externalnodes + verbs: + - get + - watch + - list +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vm-agent + namespace: vm-ns # Change the Namespace to where vm-agent is expected to run. +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: vm-agent +subjects: + - kind: ServiceAccount + name: vm-agent + namespace: vm-ns # Change the Namespace to where vm-agent is expected to run. diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 2a1b5a4a9c1..f00827dec55 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -42,6 +42,7 @@ import ( "antrea.io/antrea/pkg/agent/controller/serviceexternalip" "antrea.io/antrea/pkg/agent/controller/traceflow" "antrea.io/antrea/pkg/agent/controller/trafficcontrol" + "antrea.io/antrea/pkg/agent/externalnode" "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/exporter" "antrea.io/antrea/pkg/agent/interfacestore" @@ -60,6 +61,7 @@ import ( "antrea.io/antrea/pkg/agent/stats" agenttypes "antrea.io/antrea/pkg/agent/types" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" + crdv1alpha1informers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" "antrea.io/antrea/pkg/controller/externalippool" "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/log" @@ -226,6 +228,7 @@ func run(o *Options) error { // Initialize agent and node network. agentInitializer := agent.NewInitializer( k8sClient, + crdClient, ovsBridgeClient, ofClient, routeClient, @@ -240,6 +243,7 @@ func run(o *Options) error { networkReadyCh, stopCh, o.nodeType, + o.config.ExternalNode.ExternalNodeNamespace, features.DefaultFeatureGate.Enabled(features.AntreaProxy), o.config.AntreaProxy.ProxyAll, connectUplinkToBridge) @@ -328,7 +332,16 @@ func run(o *Options) error { // podUpdateChannel is a channel for receiving Pod updates from CNIServer and // notifying NetworkPolicyController and EgressController to reconcile rules // related to the updated Pods. - podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) + var podUpdateChannel *channel.SubscribableChannel + // externalEntityUpdateChannel is a channel for receiving ExternalEntity updates from ExternalNodeController and + // notifying NetworkPolicyController to reconcile rules related to the updated ExternalEntities. + var externalEntityUpdateChannel *channel.SubscribableChannel + if o.nodeType == config.K8sNode { + podUpdateChannel = channel.NewSubscribableChannel("PodUpdate", 100) + } else { + externalEntityUpdateChannel = channel.NewSubscribableChannel("ExternalEntityUpdate", 100) + } + // We set flow poll interval as the time interval for rule deletion in the async // rule cache, which is implemented as part of the idAllocator. This is to preserve // the rule info for populating NetworkPolicy fields in the Flow Exporter even @@ -341,12 +354,19 @@ func run(o *Options) error { statusManagerEnabled := antreaPolicyEnabled loggingEnabled := antreaPolicyEnabled + var gwPort, tunPort uint32 + if o.nodeType == config.K8sNode { + gwPort = nodeConfig.GatewayConfig.OFPort + tunPort = nodeConfig.TunnelOFPort + } + networkPolicyController, err := networkpolicy.NewNetworkPolicyController( antreaClientProvider, ofClient, ifaceStore, nodeConfig.Name, podUpdateChannel, + externalEntityUpdateChannel, groupCounters, groupIDUpdates, antreaPolicyEnabled, @@ -356,10 +376,12 @@ func run(o *Options) error { loggingEnabled, asyncRuleDeleteInterval, o.dnsServerOverride, + o.nodeType, v4Enabled, v6Enabled, - nodeConfig.GatewayConfig.OFPort, - nodeConfig.TunnelOFPort) + gwPort, + tunPort, + ) if err != nil { return fmt.Errorf("error creating new NetworkPolicy controller: %v", err) } @@ -414,6 +436,8 @@ func run(o *Options) error { var cniServer *cniserver.CNIServer var cniPodInfoStore cnipodcache.CNIPodInfoStore + var externalNodeController *externalnode.ExternalNodeController + var localExternalNodeInformer cache.SharedIndexInformer if o.nodeType == config.K8sNode { isChaining := false if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { @@ -443,6 +467,22 @@ func run(o *Options) error { return fmt.Errorf("error initializing CNI server: %v", err) } } + } else { + listOptions := func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", nodeConfig.Name).String() + } + localExternalNodeInformer = crdv1alpha1informers.NewFilteredExternalNodeInformer( + crdClient, + o.config.ExternalNode.ExternalNodeNamespace, + resyncPeriodDisabled, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + listOptions, + ) + externalNodeController, err = externalnode.NewExternalNodeController(ovsBridgeClient, ofClient, localExternalNodeInformer, + ifaceStore, externalEntityUpdateChannel, o.config.ExternalNode.ExternalNodeNamespace, o.config.ExternalNode.PolicyBypassRules) + if err != nil { + return fmt.Errorf("error creating ExternalNode controller: %v", err) + } } var traceflowController *traceflow.Controller @@ -536,6 +576,10 @@ func run(o *Options) error { go podUpdateChannel.Run(stopCh) go cniServer.Run(stopCh) go nodeRouteController.Run(stopCh) + } else { + go externalEntityUpdateChannel.Run(stopCh) + go localExternalNodeInformer.Run(stopCh) + go externalNodeController.Run(stopCh) } if networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec && diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index 55384c54cbf..b28eb05b0f7 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -23,6 +23,7 @@ import ( "github.com/spf13/pflag" "gopkg.in/yaml.v2" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/component-base/featuregate" "k8s.io/klog/v2" @@ -496,9 +497,37 @@ func (o *Options) validateExternalNodeOptions() error { if unsupported != nil { return fmt.Errorf("unsupported features on Virtual Machine: {%s}", strings.Join(unsupported, ", ")) } + if err := o.validatePolicyBypassRulesConfig(); err != nil { + return fmt.Errorf("policyBypassRules configuration is invalid: %w", err) + } return nil } +func (o *Options) validatePolicyBypassRulesConfig() error { + if len(o.config.ExternalNode.PolicyBypassRules) == 0 { + return nil + } + allowedProtocols := sets.NewString("tcp", "udp", "icmp", "ip") + for _, rule := range o.config.ExternalNode.PolicyBypassRules { + if rule.Direction != "ingress" && rule.Direction != "egress" { + return fmt.Errorf("direction %s for policyBypassRule is invalid", rule.Direction) + } + if !allowedProtocols.Has(rule.Protocol) { + return fmt.Errorf("protocol %s for policyBypassRule is invalid", rule.Protocol) + } + if _, _, err := net.ParseCIDR(rule.CIDR); err != nil { + return fmt.Errorf("cidr %s for policyBypassRule is invalid", rule.CIDR) + } + if rule.Port == 0 && (rule.Protocol == "tcp" || rule.Protocol == "udp") { + return fmt.Errorf("missing port for policyBypassRule when protocol is %s", rule.Protocol) + } + if rule.Port < 0 || rule.Port > 65535 { + return fmt.Errorf("port %d for policyBypassRule is invalid", rule.Port) + } + } + return nil + +} func (o *Options) setExternalNodeDefaultOptions() { // Following options are default values for agent running on a Virtual Machine. // They are set to avoid unexpected agent crash. @@ -509,4 +538,7 @@ func (o *Options) setExternalNodeDefaultOptions() { o.config.EnablePrometheusMetrics = new(bool) *o.config.EnablePrometheusMetrics = false } + if o.config.ExternalNode.ExternalNodeNamespace == "" { + o.config.ExternalNode.ExternalNodeNamespace = "default" + } } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index f4462321ff4..0a70e4a0919 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -39,6 +39,7 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/controller/noderoute" "antrea.io/antrea/pkg/agent/controller/trafficcontrol" + "antrea.io/antrea/pkg/agent/externalnode" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/openflow/cookie" @@ -46,6 +47,7 @@ import ( "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/agent/wireguard" + "antrea.io/antrea/pkg/client/clientset/versioned" "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/ovs/ovsctl" @@ -83,6 +85,7 @@ var otherConfigKeysForIPsecCertificates = []string{"certificate", "private_key", // Initializer knows how to setup host networking, OpenVSwitch, and Openflow. type Initializer struct { client clientset.Interface + crdClient versioned.Interface ovsBridgeClient ovsconfig.OVSBridgeClient ofClient openflow.Client routeClient route.Interface @@ -100,14 +103,16 @@ type Initializer struct { connectUplinkToBridge bool // networkReadyCh should be closed once the Node's network is ready. // The CNI server will wait for it before handling any CNI Add requests. - proxyAll bool - networkReadyCh chan<- struct{} - stopCh <-chan struct{} - nodeType config.NodeType + proxyAll bool + networkReadyCh chan<- struct{} + stopCh <-chan struct{} + nodeType config.NodeType + externalNodeNamespace string } func NewInitializer( k8sClient clientset.Interface, + crdClient versioned.Interface, ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, routeClient route.Interface, @@ -122,6 +127,7 @@ func NewInitializer( networkReadyCh chan<- struct{}, stopCh <-chan struct{}, nodeType config.NodeType, + externalNodeNamespace string, enableProxy bool, proxyAll bool, connectUplinkToBridge bool, @@ -129,6 +135,7 @@ func NewInitializer( return &Initializer{ ovsBridgeClient: ovsBridgeClient, client: k8sClient, + crdClient: crdClient, ifaceStore: ifaceStore, ofClient: ofClient, routeClient: routeClient, @@ -142,6 +149,7 @@ func NewInitializer( networkReadyCh: networkReadyCh, stopCh: stopCh, nodeType: nodeType, + externalNodeNamespace: externalNodeNamespace, enableProxy: enableProxy, proxyAll: proxyAll, connectUplinkToBridge: connectUplinkToBridge, @@ -281,9 +289,16 @@ func (i *Initializer) initInterfaceStore() error { case interfacestore.AntreaTunnel: intf = parseTunnelInterfaceFunc(port, ovsPort) case interfacestore.AntreaHost: - // Not load the host interface, because it is configured on the OVS bridge port, and we don't need a - // specific interface in the interfaceStore. - intf = nil + if port.Name == i.ovsBridge { + // Need not to load the OVS bridge port to the interfaceStore + intf = nil + } else { + var err error + intf, err = externalnode.ParseHostInterfaceConfig(i.ovsBridgeClient, port, ovsPort) + if err != nil { + return err + } + } case interfacestore.AntreaContainer: // The port should be for a container interface. intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort, true) @@ -1187,11 +1202,24 @@ func (i *Initializer) initNodeLocalConfig() error { } func (i *Initializer) initVMLocalConfig(nodeName string) error { + klog.InfoS("Initializing VM config", "ExternalNode", nodeName) + if err := wait.PollImmediateUntil(10*time.Second, func() (done bool, err error) { + _, err = i.crdClient.CrdV1alpha1().ExternalNodes(i.externalNodeNamespace).Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return false, nil + } + return true, nil + }, i.stopCh); err != nil { + klog.Info("Stopped waiting for ExternalNode") + return err + } + i.nodeConfig = &config.NodeConfig{ Name: nodeName, Type: config.ExternalNode, OVSBridge: i.ovsBridge, } + klog.InfoS("Finished VM config initialization", "ExternalNode", nodeName) return nil } diff --git a/pkg/agent/agent_linux.go b/pkg/agent/agent_linux.go index 6213126ba94..df00e644a49 100644 --- a/pkg/agent/agent_linux.go +++ b/pkg/agent/agent_linux.go @@ -53,7 +53,7 @@ func (i *Initializer) prepareOVSBridgeForK8sNode() error { uplinkNetConfig := i.nodeConfig.UplinkNetConfig uplinkNetConfig.Name = adapter.Name uplinkNetConfig.MAC = adapter.HardwareAddr - uplinkNetConfig.IP = i.nodeConfig.NodeIPv4Addr + uplinkNetConfig.IPs = []*net.IPNet{i.nodeConfig.NodeIPv4Addr} uplinkNetConfig.Index = adapter.Index // Gateway and DNSServers are not configured at adapter in Linux // Limitation: dynamic DNS servers will be lost after DHCP lease expired diff --git a/pkg/agent/agent_windows.go b/pkg/agent/agent_windows.go index 0fbe4aa6099..72b9959c1c1 100644 --- a/pkg/agent/agent_windows.go +++ b/pkg/agent/agent_windows.go @@ -81,7 +81,7 @@ func (i *Initializer) prepareHNSNetworkAndOVSExtension() error { } i.nodeConfig.UplinkNetConfig.Name = adapter.Name i.nodeConfig.UplinkNetConfig.MAC = adapter.HardwareAddr - i.nodeConfig.UplinkNetConfig.IP = i.nodeConfig.NodeTransportIPv4Addr + i.nodeConfig.UplinkNetConfig.IPs = []*net.IPNet{i.nodeConfig.NodeTransportIPv4Addr} i.nodeConfig.UplinkNetConfig.Index = adapter.Index defaultGW, err := util.GetDefaultGatewayByInterfaceIndex(adapter.Index) if err != nil { diff --git a/pkg/agent/config/node_config.go b/pkg/agent/config/node_config.go index 1b6c9fc29fc..407358ab74e 100644 --- a/pkg/agent/config/node_config.go +++ b/pkg/agent/config/node_config.go @@ -103,7 +103,8 @@ type AdapterNetConfig struct { Name string Index int MAC net.HardwareAddr - IP *net.IPNet + IPs []*net.IPNet + MTU int Gateway string DNSServers string Routes []interface{} diff --git a/pkg/agent/controller/networkpolicy/cache.go b/pkg/agent/controller/networkpolicy/cache.go index 9f38fa375f6..6730c8d2e32 100644 --- a/pkg/agent/controller/networkpolicy/cache.go +++ b/pkg/agent/controller/networkpolicy/cache.go @@ -27,6 +27,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/metrics" agenttypes "antrea.io/antrea/pkg/agent/types" v1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2" @@ -394,7 +395,8 @@ func toIGMPReportGroupAddressIndexFunc(obj interface{}) ([]string, error) { } // newRuleCache returns a new *ruleCache. -func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Subscriber, serviceGroupIDUpdate <-chan string) *ruleCache { +func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Subscriber, externalEntityUpdateSubscriber channel.Subscriber, + serviceGroupIDUpdate <-chan string, nodeType config.NodeType) *ruleCache { rules := cache.NewIndexer( ruleKeyFunc, cache.Indexers{ @@ -413,14 +415,20 @@ func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Sub dirtyRuleHandler: dirtyRuleHandler, groupIDUpdates: serviceGroupIDUpdate, } - // Subscribe Pod update events from CNIServer. - podUpdateSubscriber.Subscribe(cache.processPodUpdate) + if nodeType == config.K8sNode { + // Subscribe Pod update events from CNIServer. + podUpdateSubscriber.Subscribe(cache.processPodUpdate) + } else { + // Subscribe ExternalEntity update events from ExternalNodeController + externalEntityUpdateSubscriber.Subscribe(cache.processExternalEntityUpdate) + } + go cache.processGroupIDUpdates() return cache } // processPodUpdate will be called when CNIServer publishes a Pod update event. -// It finds out AppliedToGroups that contains this Pod and trigger reconciling +// It finds out AppliedToGroups that contain this Pod and triggers reconciliation // of related rules. // It can enforce NetworkPolicies to newly added Pods right after CNI ADD is // done if antrea-controller has computed the Pods' policies and propagated @@ -443,6 +451,24 @@ func (c *ruleCache) processPodUpdate(e interface{}) { } } +// processExternalEntityUpdate will be called when ExternalNodeController publishes an ExternalEntity update event. +// It finds out AppliedToGroups that contain this ExternalNode converted ExternalEntity and triggers reconciliation +// of related rules. +// It can enforce NetworkPolicies to ExternalEntities after ExternalEntityInterface is realised in the interface store. +func (c *ruleCache) processExternalEntityUpdate(e interface{}) { + externalEntityRef := e.(v1beta.ExternalEntityReference) + member := &v1beta.GroupMember{ + ExternalEntity: &externalEntityRef, + } + c.appliedToSetLock.RLock() + defer c.appliedToSetLock.RUnlock() + for group, memberSet := range c.appliedToSetByGroup { + if memberSet.Has(member) { + c.onAppliedToGroupUpdate(group) + } + } +} + // processGroupIDUpdates is an infinite loop that takes Service groupID // update events from the channel, finds out rules that refer this Service in // ToServices field and use dirtyRuleHandler to re-queue these rules. diff --git a/pkg/agent/controller/networkpolicy/cache_test.go b/pkg/agent/controller/networkpolicy/cache_test.go index 00c01d3ed25..9c0e0120993 100644 --- a/pkg/agent/controller/networkpolicy/cache_test.go +++ b/pkg/agent/controller/networkpolicy/cache_test.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/util/channel" @@ -278,7 +279,7 @@ func newFakeRuleCache() (*ruleCache, *dirtyRuleRecorder, *channel.SubscribableCh recorder := newDirtyRuleRecorder() podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) ch2 := make(chan string, 100) - c := newRuleCache(recorder.Record, podUpdateChannel, ch2) + c := newRuleCache(recorder.Record, podUpdateChannel, nil, ch2, config.K8sNode) return c, recorder, podUpdateChannel } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index 2e3bfdc1d22..814050eeccd 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -31,6 +31,7 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/flowexporter/connections" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" @@ -80,6 +81,8 @@ type Controller struct { multicastEnabled bool // loggingEnabled indicates where Antrea policy audit logging is enabled. loggingEnabled bool + // nodeType indicates type of the Node where Antrea Agent is running on. + nodeType config.NodeType // antreaClientProvider provides interfaces to get antreaClient, which can be // used to watch Antrea AddressGroups, AppliedToGroups, and NetworkPolicies. // We need to get antreaClient dynamically because the apiserver cert can be @@ -119,6 +122,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, ifaceStore interfacestore.InterfaceStore, nodeName string, podUpdateSubscriber channel.Subscriber, + externalEntityUpdateSubscriber channel.Subscriber, groupCounters []proxytypes.GroupCounter, groupIDUpdates <-chan string, antreaPolicyEnabled bool, @@ -128,6 +132,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, loggingEnabled bool, asyncRuleDeleteInterval time.Duration, dnsServerOverride string, + nodeType config.NodeType, v4Enabled bool, v6Enabled bool, gwPort, tunPort uint32) (*Controller, error) { @@ -136,6 +141,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, antreaClientProvider: antreaClientGetter, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "networkpolicyrule"), ofClient: ofClient, + nodeType: nodeType, antreaPolicyEnabled: antreaPolicyEnabled, antreaProxyEnabled: antreaProxyEnabled, statusManagerEnabled: statusManagerEnabled, @@ -157,7 +163,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, } c.reconciler = newReconciler(ofClient, ifaceStore, idAllocator, c.fqdnController, groupCounters, v4Enabled, v6Enabled, antreaPolicyEnabled, multicastEnabled) - c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, groupIDUpdates) + c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, externalEntityUpdateSubscriber, groupIDUpdates, nodeType) if statusManagerEnabled { c.statusManager = newStatusController(antreaClientGetter, nodeName, c.ruleCache) } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index d22b643a305..a95e63f7725 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -61,7 +61,7 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { ch2 := make(chan string, 100) groupIDAllocator := openflow.NewGroupAllocator(false) groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)} - controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, groupCounters, ch2, true, true, true, false, true, testAsyncDeleteInterval, "8.8.8.8:53", true, false, config.HostGatewayOFPort, config.DefaultTunOFPort) + controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, false, true, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort) reconciler := newMockReconciler() controller.reconciler = reconciler controller.antreaPolicyLogger = nil diff --git a/pkg/agent/controller/networkpolicy/reject.go b/pkg/agent/controller/networkpolicy/reject.go index 5c5e4690b18..5034879652d 100644 --- a/pkg/agent/controller/networkpolicy/reject.go +++ b/pkg/agent/controller/networkpolicy/reject.go @@ -22,6 +22,7 @@ import ( "antrea.io/libOpenflow/protocol" "antrea.io/ofnet/ofctrl" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" binding "antrea.io/antrea/pkg/ovs/openflow" @@ -122,6 +123,9 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { // response is being generated for locally-originated traffic that went through // kube-proxy and was re-injected into the bridge through antrea-gw. isServiceTraffic := func() bool { + if c.nodeType == config.ExternalNode { + return false + } if c.antreaProxyEnabled { matches := pktIn.GetMatches() if match := getMatchRegField(matches, openflow.ServiceEPStateField); match != nil { @@ -158,7 +162,7 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { tunPort = uint32(openflow13.P_CONTROLLER) } inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, tunPort) - mutateFunc := getRejectPacketOutMutateFunc(packetOutType) + mutateFunc := getRejectPacketOutMutateFunc(packetOutType, c.nodeType) if proto == protocol.Type_TCP { // Get TCP data. @@ -256,12 +260,19 @@ func getRejectOFPorts(rejectType RejectType, sIface, dIface *interfacestore.Inte case RejectServiceLocal: inPort = uint32(sIface.OFPort) case RejectPodRemoteToLocal: - inPort = gwOFPort + if dIface.Type == interfacestore.ExternalEntityInterface { + inPort = uint32(dIface.EntityInterfaceConfig.UplinkPort.OFPort) + } else { + inPort = gwOFPort + } outPort = uint32(dIface.OFPort) case RejectServiceRemoteToLocal: inPort = gwOFPort case RejectLocalToRemote: inPort = uint32(sIface.OFPort) + if sIface.Type == interfacestore.ExternalEntityInterface { + outPort = uint32(sIface.EntityInterfaceConfig.UplinkPort.OFPort) + } case RejectNoAPServiceLocal: inPort = uint32(sIface.OFPort) outPort = gwOFPort @@ -273,7 +284,7 @@ func getRejectOFPorts(rejectType RejectType, sIface, dIface *interfacestore.Inte } // getRejectPacketOutMutateFunc returns the mutate func of a packetOut based on the RejectType. -func getRejectPacketOutMutateFunc(rejectType RejectType) func(binding.PacketOutBuilder) binding.PacketOutBuilder { +func getRejectPacketOutMutateFunc(rejectType RejectType, nodeType config.NodeType) func(binding.PacketOutBuilder) binding.PacketOutBuilder { var mutatePacketOut func(binding.PacketOutBuilder) binding.PacketOutBuilder switch rejectType { case RejectServiceLocal: @@ -283,6 +294,10 @@ func getRejectPacketOutMutateFunc(rejectType RejectType) func(binding.PacketOutB } case RejectLocalToRemote: tableID := openflow.L3ForwardingTable.GetID() + // L3ForwardingTable is not initialized for ExternalNode case since layer 3 is not needed. + if nodeType == config.ExternalNode { + tableID = openflow.L2ForwardingCalcTable.GetID() + } mutatePacketOut = func(packetOutBuilder binding.PacketOutBuilder) binding.PacketOutBuilder { return packetOutBuilder.AddResubmitAction(nil, &tableID) } diff --git a/pkg/agent/controller/networkpolicy/status_controller_test.go b/pkg/agent/controller/networkpolicy/status_controller_test.go index d719e8baa10..8db35fa0bd9 100644 --- a/pkg/agent/controller/networkpolicy/status_controller_test.go +++ b/pkg/agent/controller/networkpolicy/status_controller_test.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/util/channel" ) @@ -51,7 +52,7 @@ func (c *fakeNetworkPolicyControl) getNetworkPolicyStatus() *v1beta2.NetworkPoli } func newTestStatusController() (*StatusController, *ruleCache, *fakeNetworkPolicyControl) { - ruleCache := newRuleCache(func(s string) {}, channel.NewSubscribableChannel("PodUpdate", 100), make(chan string, 100)) + ruleCache := newRuleCache(func(s string) {}, channel.NewSubscribableChannel("PodUpdate", 100), nil, make(chan string, 100), config.K8sNode) statusControl := &fakeNetworkPolicyControl{} statusController := newStatusController(nil, testNode1, ruleCache) statusController.statusControlInterface = statusControl diff --git a/pkg/agent/externalnode/external_node_controller.go b/pkg/agent/externalnode/external_node_controller.go new file mode 100644 index 00000000000..28d5dccbb28 --- /dev/null +++ b/pkg/agent/externalnode/external_node_controller.go @@ -0,0 +1,682 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externalnode + +import ( + "fmt" + "net" + "reflect" + "strings" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" + "antrea.io/antrea/pkg/agent/util" + "antrea.io/antrea/pkg/apis/controlplane/v1beta2" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" + enlister "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" + agentConfig "antrea.io/antrea/pkg/config/agent" + binding "antrea.io/antrea/pkg/ovs/openflow" + "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/ovs/ovsctl" + "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/env" + "antrea.io/antrea/pkg/util/externalnode" + "antrea.io/antrea/pkg/util/ip" + "antrea.io/antrea/pkg/util/k8s" +) + +const ( + controllerName = "ExternalNodeController" + // How long to wait before retrying the processing of an ExternalNode change. + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second + // Disable resyncing. + resyncPeriod time.Duration = 0 + + ovsExternalIDUplinkName = "uplink-name" + ovsExternalIDUplinkPort = "uplink-port" + ovsExternalIDEntityName = "entity-name" + ovsExternalIDEntityNamespace = "entity-namespace" + ovsExternalIDIPs = "ip-address" + ipsSplitter = "," +) + +var ( + keyFunc = cache.MetaNamespaceKeyFunc + splitKeyFunc = cache.SplitMetaNamespaceKey +) + +type ExternalNodeController struct { + ovsBridgeClient ovsconfig.OVSBridgeClient + ovsctlClient ovsctl.OVSCtlClient + ofClient openflow.Client + externalNodeInformer cache.SharedIndexInformer + externalNodeLister enlister.ExternalNodeLister + externalNodeListerSynced cache.InformerSynced + queue workqueue.RateLimitingInterface + ifaceStore interfacestore.InterfaceStore + syncedExternalNode *v1alpha1.ExternalNode + // externalEntityUpdateNotifier is used for notifying ExternalEntity updates to NetworkPolicyController. + externalEntityUpdateNotifier channel.Notifier + nodeName string + externalNodeNamespace string + policyBypassRules []agentConfig.PolicyBypassRule +} + +func NewExternalNodeController(ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, externalNodeInformer cache.SharedIndexInformer, + ifaceStore interfacestore.InterfaceStore, externalEntityUpdateNotifier channel.Notifier, externalNodeNamespace string, policyBypassRules []agentConfig.PolicyBypassRule) (*ExternalNodeController, error) { + c := &ExternalNodeController{ + ovsBridgeClient: ovsBridgeClient, + ovsctlClient: ovsctl.NewClient(ovsBridgeClient.GetBridgeName()), + ofClient: ofClient, + externalNodeInformer: externalNodeInformer, + externalNodeLister: enlister.NewExternalNodeLister(externalNodeInformer.GetIndexer()), + externalNodeListerSynced: externalNodeInformer.HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "externalNode"), + ifaceStore: ifaceStore, + externalEntityUpdateNotifier: externalEntityUpdateNotifier, + policyBypassRules: policyBypassRules, + } + nodeName, err := env.GetNodeName() + if err != nil { + return nil, err + } + c.nodeName = nodeName + c.externalNodeNamespace = externalNodeNamespace + c.externalNodeInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueueExternalNodeAdd, + UpdateFunc: c.enqueueExternalNodeUpdate, + DeleteFunc: c.enqueueExternalNodeDelete, + }, + resyncPeriod) + + return c, nil +} + +// Run will create a worker (goroutine) which will process the ExternalNode events from the work queue. +func (c *ExternalNodeController) Run(stopCh <-chan struct{}) { + defer c.queue.ShutDown() + + klog.InfoS("Starting controller", "name", controllerName) + defer klog.InfoS("Shutting down controller", "name", controllerName) + + if err := wait.PollImmediateUntil(5*time.Second, func() (done bool, err error) { + if err = c.reconcile(); err != nil { + klog.ErrorS(err, "ExternalNodeController failed during reconciliation") + return false, nil + } + return true, nil + }, stopCh); err != nil { + klog.Info("Stopped ExternalNodeController reconciliation") + return + } + if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.externalNodeListerSynced) { + klog.Error("Failed to wait for syncing ExternalNodes cache") + return + } + + c.queue.Add(k8s.NamespacedName(c.externalNodeNamespace, c.nodeName)) + go wait.Until(c.worker, time.Second, stopCh) + + <-stopCh +} + +func (c *ExternalNodeController) enqueueExternalNodeAdd(obj interface{}) { + en := obj.(*v1alpha1.ExternalNode) + key, _ := keyFunc(en) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode ADD event", "ExternalNode", klog.KObj(en)) +} + +func (c *ExternalNodeController) enqueueExternalNodeUpdate(oldObj interface{}, newObj interface{}) { + oldEN := oldObj.(*v1alpha1.ExternalNode) + newEN := newObj.(*v1alpha1.ExternalNode) + if reflect.DeepEqual(oldEN.Spec.Interfaces, newEN.Spec.Interfaces) { + klog.InfoS("Skip enqueuing ExternalNode UPDATE event as no changes for interfaces", "ExternalNode", klog.KObj(newEN)) + return + } + key, _ := keyFunc(newEN) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode UPDATE event", "ExternalNode", klog.KObj(newEN)) +} + +func (c *ExternalNodeController) enqueueExternalNodeDelete(obj interface{}) { + en := obj.(*v1alpha1.ExternalNode) + key, _ := keyFunc(en) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode DELETE event", "ExternalNode", klog.KObj(en)) +} + +func (c *ExternalNodeController) reconcile() error { + klog.InfoS("Reconciling for controller", "name", controllerName) + if err := c.reconcileHostUplinkFlows(); err != nil { + return fmt.Errorf("failed to reconcile host uplink flows %v", err) + } + if err := c.reconcilePolicyBypassFlows(); err != nil { + return fmt.Errorf("failed to reconcile reserved flows %v", err) + } + klog.InfoS("Reconciled for controller", "name", controllerName) + return nil +} + +func (c *ExternalNodeController) reconcileHostUplinkFlows() error { + hostIfaces := c.ifaceStore.GetInterfacesByType(interfacestore.ExternalEntityInterface) + for _, hostIface := range hostIfaces { + if err := c.ofClient.InstallVMUplinkFlows(hostIface.InterfaceName, hostIface.OVSPortConfig.OFPort, hostIface.UplinkPort.OFPort); err != nil { + return err + } + klog.InfoS("Reconciled host uplink flow for ExternalEntityInterface", "ifName", hostIface.InterfaceName) + } + return nil +} + +func (c *ExternalNodeController) reconcilePolicyBypassFlows() error { + for _, rule := range c.policyBypassRules { + klog.V(2).InfoS("Installing policy bypass flows", "protocol", rule.Protocol, "CIDR", rule.CIDR, "port", rule.Port, "direction", rule.Direction) + protocol := parseProtocol(rule.Protocol) + _, ipNet, _ := net.ParseCIDR(rule.CIDR) + if err := c.ofClient.InstallPolicyBypassFlows(protocol, ipNet, uint16(rule.Port), rule.Direction == "ingress"); err != nil { + return err + } + } + klog.InfoS("Installed policy bypass flows", "RuleCount", len(c.policyBypassRules)) + return nil +} + +// worker is a long-running function that will continuously call the processNextWorkItem function in +// order to read and process a message on the work queue. +func (c *ExternalNodeController) worker() { + for c.processNextWorkItem() { + } +} + +func (c *ExternalNodeController) processNextWorkItem() bool { + obj, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(obj) + + if key, ok := obj.(string); !ok { + c.queue.Forget(obj) + klog.Errorf("Expected string type in work queue but got %#v", obj) + return true + } else if err := c.syncExternalNode(key); err == nil { + // If no error occurs, then forget this item so it does not get queued again until + // another change happens. + c.queue.Forget(key) + } else { + // Put the item back on the work queue to handle any transient errors. + c.queue.AddRateLimited(key) + klog.ErrorS(err, "Error syncing ExternalNode", "ExternalNode", key) + } + return true +} + +func (c *ExternalNodeController) syncExternalNode(key string) error { + _, name, err := splitKeyFunc(key) + if err != nil { + // This err should not occur. + return err + } + en, err := c.externalNodeLister.ExternalNodes(c.externalNodeNamespace).Get(name) + if errors.IsNotFound(err) { + return c.deleteExternalNode() + } + + if c.syncedExternalNode == nil { + return c.addExternalNode(en) + } else { + return c.updateExternalNode(c.syncedExternalNode, en) + } +} + +func (c *ExternalNodeController) addExternalNode(en *v1alpha1.ExternalNode) error { + klog.InfoS("Adding ExternalNode", "ExternalNode", klog.KObj(en)) + eeName, err := externalnode.GenExternalEntityName(en) + if err != nil { + return err + } + ifName, ips, err := getHostInterfaceName(en.Spec.Interfaces[0]) + if err != nil { + return err + } + if err := c.addInterface(ifName, en.Namespace, eeName, ips); err != nil { + return err + } + c.syncedExternalNode = en + // Notify the ExternalEntity event to NetworkPolicyController. + c.externalEntityUpdateNotifier.Notify(v1beta2.ExternalEntityReference{ + Name: eeName, + Namespace: en.Namespace, + }) + return nil +} + +func (c *ExternalNodeController) addInterface(ifName string, eeNamespace string, eeName string, ips []string) error { + hostIface, ifaceExists := c.ifaceStore.GetInterfaceByName(ifName) + if !ifaceExists { + klog.InfoS("Creating OVS ports and flows for ExternalEntityInterface", "ifName", ifName, "externalEntity", eeName, "ips", ips) + uplinkName := util.GenerateUplinkInterfaceName(ifName) + iface, err := c.createOVSPortsAndFlows(uplinkName, ifName, eeNamespace, eeName, ips) + if err != nil { + return err + } + c.ifaceStore.AddInterface(iface) + return nil + } + klog.InfoS("Updating OVS port data", "ifName", ifName, "externalEntity", eeName, "ips", ips) + portUUID := hostIface.PortUUID + portName := hostIface.InterfaceName + portData, ovsErr := c.ovsBridgeClient.GetPortData(portUUID, portName) + if ovsErr != nil { + return ovsErr + } + preEEName := portData.ExternalIDs[ovsExternalIDEntityName] + preIPs := sets.NewString(strings.Split(portData.ExternalIDs[ovsExternalIDIPs], ipsSplitter)...) + if preEEName == eeName && sets.NewString(ips...).Equal(preIPs) { + klog.InfoS("Skipping updating OVS port data as both entity name and ip are not changed", "ifName", ifName) + return nil + } + + iface, err := c.updateOVSPortsData(hostIface, portData, eeName, ips) + if err != nil { + return err + } + c.ifaceStore.AddInterface(iface) + return nil +} + +func (c *ExternalNodeController) updateExternalNode(preEN *v1alpha1.ExternalNode, curEN *v1alpha1.ExternalNode) error { + klog.InfoS("Updating ExternalNode", "ExternalNode", klog.KObj(curEN)) + if reflect.DeepEqual(preEN.Spec.Interfaces[0], curEN.Spec.Interfaces[0]) { + klog.InfoS("Skip processing ExternalNode update as no changes for Interface[0]", "ExternalNode", klog.KObj(curEN)) + return nil + } + preEEName, err := externalnode.GenExternalEntityName(preEN) + if err != nil { + return err + } + preIfName, preIPs, err := getHostInterfaceName(preEN.Spec.Interfaces[0]) + if err != nil { + return err + } + curEEName, err := externalnode.GenExternalEntityName(curEN) + if err != nil { + return err + } + curIfName, curIPs, err := getHostInterfaceName(curEN.Spec.Interfaces[0]) + if err != nil { + return err + } + if preIfName != curIfName { + klog.InfoS("Found interface name is changed", "preName", preIfName, "curName", curIfName) + if err = c.addInterface(curIfName, curEN.Namespace, curEEName, curIPs); err != nil { + return err + } + ifaceConfig, ifaceExists := c.ifaceStore.GetInterfaceByName(preIfName) + if ifaceExists { + if err = c.deleteInterface(ifaceConfig); err != nil { + return err + } + } + } else if !reflect.DeepEqual(preIPs, curIPs) || preEEName != curEEName { + klog.InfoS("Found interface configuration is changed", "preIPs", preIPs, "preExternalEntity", preEEName, + "curIPs", curIPs, "curExternalEntity", curEEName) + if err = c.addInterface(curIfName, curEN.Namespace, curEEName, curIPs); err != nil { + return err + } + } + c.syncedExternalNode = curEN + // Notify the ExternalEntity event to NetworkPolicyController. + c.externalEntityUpdateNotifier.Notify(v1beta2.ExternalEntityReference{ + Name: curEEName, + Namespace: curEN.Namespace, + }) + return nil +} + +func (c *ExternalNodeController) deleteExternalNode() error { + if err := c.deleteInterfaces(); err != nil { + return err + } + c.syncedExternalNode = nil + return nil +} + +func (c *ExternalNodeController) deleteInterfaces() error { + hostIfaces := c.ifaceStore.GetInterfacesByType(interfacestore.ExternalEntityInterface) + for _, hostIface := range hostIfaces { + if err := c.deleteInterface(hostIface); err != nil { + return err + } + } + return nil +} + +func (c *ExternalNodeController) deleteInterface(interfaceConfig *interfacestore.InterfaceConfig) error { + klog.InfoS("Deleting interface", "ifName", interfaceConfig.InterfaceName) + if err := c.removeOVSPortsAndFlows(interfaceConfig); err != nil { + return err + } + c.ifaceStore.DeleteInterface(interfaceConfig) + return nil +} + +func (c *ExternalNodeController) createOVSPortsAndFlows(uplinkName, hostIFName, eeNamespace, eeName string, ips []string) (*interfacestore.InterfaceConfig, error) { + iface, addrs, routes, err := util.GetInterfaceConfig(hostIFName) + if err != nil { + return nil, err + } + adapterConfig := &config.AdapterNetConfig{ + Name: hostIFName, + Index: iface.Index, + MAC: iface.HardwareAddr, + IPs: addrs, + Routes: routes, + MTU: iface.MTU, + } + if err = util.RenameInterface(hostIFName, uplinkName); err != nil { + return nil, err + } + success := false + defer func() { + if !success { + if err = util.RenameInterface(uplinkName, hostIFName); err != nil { + klog.ErrorS(err, "Failed to restore uplink name back to host interface name. Manual cleanup is required", "uplinkName", uplinkName, "hostIFName", hostIFName) + } + } + }() + + // Create uplink port in OVS. + uplinkExternalIDs := map[string]interface{}{ + interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaUplink, + } + uplinkUUID, ovsErr := c.ovsBridgeClient.CreatePort(uplinkName, uplinkName, uplinkExternalIDs) + if ovsErr != nil { + return nil, fmt.Errorf("failed to create uplink port %s in OVS, err %v", uplinkName, ovsErr) + } + defer func() { + if !success { + if ovsErr = c.ovsBridgeClient.DeletePort(uplinkUUID); ovsErr != nil { + klog.ErrorS(err, "Failed to delete uplink port. Manual cleanup is required", "portUUID", uplinkUUID, "uplinkName", uplinkName) + } + } + }() + uplinkOFPort, ovsErr := c.ovsBridgeClient.GetOFPort(uplinkName, false) + if ovsErr != nil { + return nil, ovsErr + } + klog.InfoS("Added uplink port in OVS", "port", uplinkOFPort, "uplinkName", uplinkName) + + // Create host port in OVS. + attachInfo := GetOVSAttachInfo(uplinkName, uplinkUUID, eeName, eeNamespace, ips) + hostIfUUID, ovsErr := c.ovsBridgeClient.CreateInternalPort(hostIFName, 0, adapterConfig.MAC.String(), attachInfo) + if ovsErr != nil { + return nil, fmt.Errorf("failed to create OVS internal port for host interface %s, err %v", hostIFName, ovsErr) + } + defer func() { + if !success { + if ovsErr = c.ovsBridgeClient.DeletePort(hostIfUUID); ovsErr != nil { + klog.ErrorS(err, "Failed to delete host interface port. Manual cleanup is required", "portUUID", hostIfUUID, "hostIFName", hostIFName) + } + } + }() + hostOFPort, ovsErr := c.ovsBridgeClient.GetOFPort(hostIFName, false) + if ovsErr != nil { + return nil, ovsErr + } + klog.InfoS("Created an OVS internal port for host interface", "ofPort", hostOFPort, "interfaceName", hostIFName) + // Move configurations from the uplink to host port + if err = c.moveIFConfigurations(adapterConfig, uplinkName, hostIFName); err != nil { + return nil, err + } + klog.InfoS("Moved configurations to the host interface", "hostInterface", hostIFName) + if err = c.ofClient.InstallVMUplinkFlows(hostIFName, hostOFPort, uplinkOFPort); err != nil { + return nil, err + } + klog.InfoS("Added uplink and host port in OVS and installed openflow entries", "uplink", uplinkName, "hostInterface", hostIFName) + success = true + ifIPs := make([]net.IP, 0) + for _, ip := range ips { + ifIPs = append(ifIPs, net.ParseIP(ip)) + } + hostIFConfig := &interfacestore.InterfaceConfig{ + Type: interfacestore.ExternalEntityInterface, + InterfaceName: hostIFName, + IPs: ifIPs, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: hostIfUUID, + OFPort: hostOFPort, + }, + EntityInterfaceConfig: &interfacestore.EntityInterfaceConfig{ + EntityName: eeName, + EntityNamespace: eeNamespace, + UplinkPort: &interfacestore.OVSPortConfig{ + PortUUID: uplinkUUID, + OFPort: uplinkOFPort, + }, + }, + } + return hostIFConfig, nil +} + +func GetOVSAttachInfo(uplinkName, uplinkUUID, entityName, entityNamespace string, ips []string) map[string]interface{} { + attachInfo := map[string]interface{}{ + interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaHost, + } + if uplinkName != "" { + attachInfo[ovsExternalIDUplinkName] = uplinkName + } + if uplinkUUID != "" { + attachInfo[ovsExternalIDUplinkPort] = uplinkUUID + } + if entityName != "" { + attachInfo[ovsExternalIDEntityName] = entityName + } + if entityNamespace != "" { + attachInfo[ovsExternalIDEntityNamespace] = entityNamespace + } + if len(ips) != 0 { + attachInfo[ovsExternalIDIPs] = strings.Join(ips, ipsSplitter) + } + + return attachInfo +} + +func (c *ExternalNodeController) updateOVSPortsData(interfaceConfig *interfacestore.InterfaceConfig, portData *ovsconfig.OVSPortData, eeName string, ips []string) (*interfacestore.InterfaceConfig, error) { + attachInfo := map[string]interface{}{ + ovsExternalIDUplinkName: portData.ExternalIDs[ovsExternalIDUplinkName], + ovsExternalIDUplinkPort: portData.ExternalIDs[ovsExternalIDUplinkPort], + ovsExternalIDEntityName: eeName, + ovsExternalIDEntityNamespace: portData.ExternalIDs[ovsExternalIDEntityNamespace], + ovsExternalIDIPs: strings.Join(ips, ipsSplitter), + interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaHost, + } + err := c.ovsBridgeClient.SetPortExternalIDs(interfaceConfig.InterfaceName, attachInfo) + if err != nil { + return nil, err + } + ifIPs := make([]net.IP, 0) + for _, ip := range ips { + ifIPs = append(ifIPs, net.ParseIP(ip)) + } + iface := &interfacestore.InterfaceConfig{ + InterfaceName: interfaceConfig.InterfaceName, + Type: interfacestore.ExternalEntityInterface, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: interfaceConfig.PortUUID, + OFPort: interfaceConfig.OFPort, + }, + EntityInterfaceConfig: &interfacestore.EntityInterfaceConfig{ + EntityName: eeName, + EntityNamespace: interfaceConfig.EntityNamespace, + UplinkPort: &interfacestore.OVSPortConfig{ + PortUUID: interfaceConfig.UplinkPort.PortUUID, + OFPort: interfaceConfig.UplinkPort.OFPort, + }, + }, + IPs: ifIPs, + } + return iface, nil +} + +func (c *ExternalNodeController) removeOVSPortsAndFlows(interfaceConfig *interfacestore.InterfaceConfig) error { + portUUID := interfaceConfig.PortUUID + portName := interfaceConfig.InterfaceName + if err := c.ofClient.UninstallVMUplinkFlows(portName); err != nil { + return fmt.Errorf("failed to uninstall uplink and host port openflow entries, portName %s, err %v", portName, err) + } + klog.InfoS("Removed the flows installed to forward packet between uplinkPort and hostPort", "hostInterface", portName) + hostIFName := interfaceConfig.InterfaceName + uplinkIfName := util.GenerateUplinkInterfaceName(portName) + uplinkPortID := interfaceConfig.UplinkPort.PortUUID + iface, addrs, routes, err := util.GetInterfaceConfig(hostIFName) + if err != nil { + return err + } + adapterConfig := &config.AdapterNetConfig{ + Name: hostIFName, + Index: iface.Index, + MAC: iface.HardwareAddr, + IPs: addrs, + Routes: routes, + MTU: iface.MTU, + } + if ovsErr := c.ovsBridgeClient.DeletePort(portUUID); ovsErr != nil { + return fmt.Errorf("failed to delete host port %s, err %v", hostIFName, ovsErr) + } + klog.InfoS("Deleted host port in OVS", "hostInterface", hostIFName) + if ovsErr := c.ovsBridgeClient.DeletePort(uplinkPortID); ovsErr != nil { + return fmt.Errorf("failed to delete uplink port %s, err %v", uplinkIfName, ovsErr) + } + klog.InfoS("Deleted uplink port in OVS", "uplinkIfName", uplinkIfName) + defer func() { + // Delete host interface from OVS datapath if it exists. + // This is to resolve an issue that OVS fails to remove the interface from datapath. It might happen because the interface + // is busy when OVS tries to remove it with the OVSDB interface deletion event. + if err := c.ovsctlClient.DeleteDPInterface(hostIFName); err != nil { + klog.ErrorS(err, "Failed to delete host interface from OVS datapath", "interface", hostIFName) + } + }() + + // Wait until the host interface created by OVS is removed. + if err = wait.PollImmediate(50*time.Millisecond, 2*time.Second, func() (bool, error) { + return !util.HostInterfaceExists(hostIFName), nil + }); err != nil { + return fmt.Errorf("failed to wait for host interface %s deletion in 2s, err %v", hostIFName, err) + } + // Recover the uplink interface's name. + if err = util.RenameInterface(uplinkIfName, hostIFName); err != nil { + return err + } + klog.InfoS("Recovered uplink name to the host interface name", "uplinkIfName", uplinkIfName, "hostInterface", hostIFName) + // Move the IP configurations back to the host interface. + if err = c.moveIFConfigurations(adapterConfig, "", hostIFName); err != nil { + return err + } + klog.InfoS("Moved back configuration to the host interface", "hostInterface", hostIFName) + return nil +} + +func getHostInterfaceName(iface v1alpha1.NetworkInterface) (string, []string, error) { + ifName := "" + ips := sets.NewString() + for _, ipStr := range iface.IPs { + var ipFilter *ip.DualStackIPs + ifIP := net.ParseIP(ipStr) + if ifIP.To4() != nil { + ipFilter = &ip.DualStackIPs{IPv4: ifIP} + } else { + ipFilter = &ip.DualStackIPs{IPv6: ifIP} + } + _, _, link, err := util.GetIPNetDeviceFromIP(ipFilter, sets.NewString()) + if err == nil { + klog.InfoS("Using the interface", "linkName", link.Name, "IP", ipStr) + ips.Insert(ipStr) + if ifName == "" { + ifName = link.Name + } else if ifName != link.Name { + return "", ips.List(), fmt.Errorf("find different interfaces by IPs, ifName %s, linkName %s", ifName, link.Name) + } + } else { + klog.ErrorS(err, "Failed to get device from IP", "ip", ifIP) + } + } + if ifName == "" { + return "", ips.List(), fmt.Errorf("cannot find interface via IPs %v", iface.IPs) + } + return ifName, ips.List(), nil + +} + +func ParseHostInterfaceConfig(ovsBridgeClient ovsconfig.OVSBridgeClient, portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) (*interfacestore.InterfaceConfig, error) { + var interfaceConfig *interfacestore.InterfaceConfig + interfaceConfig = &interfacestore.InterfaceConfig{ + InterfaceName: portData.Name, + Type: interfacestore.ExternalEntityInterface, + OVSPortConfig: portConfig, + } + var hostUplinkConfig *interfacestore.EntityInterfaceConfig + entityIPArr := strings.Split(portData.ExternalIDs[ovsExternalIDIPs], ipsSplitter) + var entityIPs []net.IP + for _, ipStr := range entityIPArr { + entityIPs = append(entityIPs, net.ParseIP(ipStr)) + } + interfaceConfig.IPs = entityIPs + uplinkName, _ := portData.ExternalIDs[ovsExternalIDUplinkName] + uplinkPortUUID, _ := portData.ExternalIDs[ovsExternalIDUplinkPort] + uplinkPortData, ovsErr := ovsBridgeClient.GetPortData(uplinkPortUUID, uplinkName) + if ovsErr != nil { + return nil, ovsErr + } + entityName, _ := portData.ExternalIDs[ovsExternalIDEntityName] + entityNamespace, _ := portData.ExternalIDs[ovsExternalIDEntityNamespace] + hostUplinkConfig = &interfacestore.EntityInterfaceConfig{ + EntityName: entityName, + EntityNamespace: entityNamespace, + UplinkPort: &interfacestore.OVSPortConfig{ + PortUUID: uplinkPortUUID, + OFPort: uplinkPortData.OFPort, + }, + } + interfaceConfig.EntityInterfaceConfig = hostUplinkConfig + return interfaceConfig, nil +} + +func parseProtocol(protocol string) binding.Protocol { + var proto binding.Protocol + switch protocol { + case "tcp": + proto = binding.ProtocolTCP + case "udp": + proto = binding.ProtocolUDP + case "icmp": + proto = binding.ProtocolICMP + case "ip": + proto = binding.ProtocolIP + } + return proto +} diff --git a/pkg/agent/externalnode/external_node_controller_linux.go b/pkg/agent/externalnode/external_node_controller_linux.go new file mode 100644 index 00000000000..69758733dac --- /dev/null +++ b/pkg/agent/externalnode/external_node_controller_linux.go @@ -0,0 +1,59 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externalnode + +import ( + "fmt" + + "github.com/vishvananda/netlink" + + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/util" +) + +func (c *ExternalNodeController) moveIFConfigurations(adapterConfig *config.AdapterNetConfig, src string, dst string) error { + dstLink, err := netlink.LinkByName(dst) + if err != nil { + return fmt.Errorf("failed to find link for destination %s, err %v", dst, err) + } + if src != "" { + srcLink, err := netlink.LinkByName(src) + if err != nil { + return fmt.Errorf("failed to find link for source %s, err %v", src, err) + } + if err := netlink.LinkSetMTU(dstLink, adapterConfig.MTU); err != nil { + return err + } + if err := netlink.LinkSetUp(dstLink); err != nil { + return err + } + if err := util.RemoveLinkIPs(srcLink); err != nil { + return err + } + if err := util.RemoveLinkRoutes(srcLink); err != nil { + return err + } + } + dstIndex := dstLink.Attrs().Index + // Configure the source interface's IPs on the destination interface. + if err := util.ConfigureLinkAddresses(dstIndex, adapterConfig.IPs); err != nil { + return err + } + // Configure the source interface's routes on the destination interface. + if err := util.ConfigureLinkRoutes(dstLink, adapterConfig.Routes); err != nil { + return err + } + return nil +} diff --git a/pkg/agent/externalnode/external_node_controller_windows.go b/pkg/agent/externalnode/external_node_controller_windows.go new file mode 100644 index 00000000000..390133de5af --- /dev/null +++ b/pkg/agent/externalnode/external_node_controller_windows.go @@ -0,0 +1,21 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externalnode + +import "antrea.io/antrea/pkg/agent/config" + +func (c *ExternalNodeController) moveIFConfigurations(adapterConfig *config.AdapterNetConfig, src string, dst string) error { + return nil +} diff --git a/pkg/agent/interfacestore/types.go b/pkg/agent/interfacestore/types.go index b90ca452481..45f54003c9f 100644 --- a/pkg/agent/interfacestore/types.go +++ b/pkg/agent/interfacestore/types.go @@ -89,8 +89,6 @@ type EntityInterfaceConfig struct { EntityNamespace string // UplinkPort is the OVS port configuration for the uplink, which is a pair port of this interface on OVS. UplinkPort *OVSPortConfig - // HostIfaceIndex is the index of the host interface created by this OVS internal port. - HostIfaceIndex int } type InterfaceConfig struct { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index e2b310341ac..4bfe94bae6d 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -352,7 +352,7 @@ type Client interface { // InstallPolicyBypassFlows installs flows to bypass the NetworkPolicy rules on the traffic with the given ipnet // or ip, port, protocol and direction. It is used to bypass NetworkPolicy enforcement on a VM for the particular // traffic. - InstallPolicyBypassFlows(protocol binding.Protocol, ipnet *net.IPNet, ip net.IP, port uint16, isIngress bool) error + InstallPolicyBypassFlows(protocol binding.Protocol, ipNet *net.IPNet, port uint16, isIngress bool) error } // GetFlowTableStatus returns an array of flow table status. diff --git a/pkg/agent/openflow/externalnode_connectivity.go b/pkg/agent/openflow/externalnode_connectivity.go index 98e40fcc439..26653edaade 100644 --- a/pkg/agent/openflow/externalnode_connectivity.go +++ b/pkg/agent/openflow/externalnode_connectivity.go @@ -152,7 +152,7 @@ func (f *featureExternalNodeConnectivity) replayFlows() []binding.Flow { return flows } -func (f *featureExternalNodeConnectivity) policyBypassFlow(protocol binding.Protocol, ipnet *net.IPNet, ip net.IP, port uint16, isIngress bool) binding.Flow { +func (f *featureExternalNodeConnectivity) policyBypassFlow(protocol binding.Protocol, ipNet *net.IPNet, port uint16, isIngress bool) binding.Flow { cookieID := f.cookieAllocator.Request(f.category).Raw() var flowBuilder binding.FlowBuilder var nextTable *Table @@ -161,26 +161,16 @@ func (f *featureExternalNodeConnectivity) policyBypassFlow(protocol binding.Prot Cookie(cookieID). MatchProtocol(protocol). MatchCTStateNew(true). - MatchCTStateTrk(true) - if ipnet != nil { - flowBuilder.MatchSrcIPNet(*ipnet) - } - if ip != nil { - flowBuilder.MatchSrcIP(ip) - } + MatchCTStateTrk(true). + MatchSrcIPNet(*ipNet) nextTable = IngressMetricTable } else { flowBuilder = EgressSecurityClassifierTable.ofTable.BuildFlow(priorityNormal). Cookie(cookieID). MatchProtocol(protocol). MatchCTStateNew(true). - MatchCTStateTrk(true) - if ipnet != nil { - flowBuilder.MatchDstIPNet(*ipnet) - } - if ip != nil { - flowBuilder.MatchDstIP(ip) - } + MatchCTStateTrk(true). + MatchDstIPNet(*ipNet) nextTable = EgressMetricTable } return flowBuilder.MatchDstPort(port, nil). @@ -210,8 +200,8 @@ func (c *client) UninstallVMUplinkFlows(hostIFName string) error { return c.deleteFlows(c.featureExternalNodeConnectivity.uplinkFlowCache, hostIFName) } -func (c *client) InstallPolicyBypassFlows(protocol binding.Protocol, ipnet *net.IPNet, ip net.IP, port uint16, isIngress bool) error { - flow := c.featureExternalNodeConnectivity.policyBypassFlow(protocol, ipnet, ip, port, isIngress) +func (c *client) InstallPolicyBypassFlows(protocol binding.Protocol, ipNet *net.IPNet, port uint16, isIngress bool) error { + flow := c.featureExternalNodeConnectivity.policyBypassFlow(protocol, ipNet, port, isIngress) if err := c.ofEntryOperations.Add(flow); err != nil { return err } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 8c08447fce5..0874f784ed6 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -423,17 +423,17 @@ func (mr *MockClientMockRecorder) InstallPodSNATFlows(arg0, arg1, arg2 interface } // InstallPolicyBypassFlows mocks base method -func (m *MockClient) InstallPolicyBypassFlows(arg0 openflow.Protocol, arg1 *net.IPNet, arg2 net.IP, arg3 uint16, arg4 bool) error { +func (m *MockClient) InstallPolicyBypassFlows(arg0 openflow.Protocol, arg1 *net.IPNet, arg2 uint16, arg3 bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallPolicyBypassFlows", arg0, arg1, arg2, arg3, arg4) + ret := m.ctrl.Call(m, "InstallPolicyBypassFlows", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } // InstallPolicyBypassFlows indicates an expected call of InstallPolicyBypassFlows -func (mr *MockClientMockRecorder) InstallPolicyBypassFlows(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallPolicyBypassFlows(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallPolicyBypassFlows", reflect.TypeOf((*MockClient)(nil).InstallPolicyBypassFlows), arg0, arg1, arg2, arg3, arg4) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallPolicyBypassFlows", reflect.TypeOf((*MockClient)(nil).InstallPolicyBypassFlows), arg0, arg1, arg2, arg3) } // InstallPolicyRuleFlows mocks base method diff --git a/pkg/agent/util/net.go b/pkg/agent/util/net.go index 9d267283111..12f500d6202 100644 --- a/pkg/agent/util/net.go +++ b/pkg/agent/util/net.go @@ -415,3 +415,17 @@ func GenerateRandomMAC() net.HardwareAddr { buf[0] |= 2 return buf } + +func GetIPNetsByLink(link *net.Interface) ([]*net.IPNet, error) { + addrList, err := link.Addrs() + if err != nil { + return nil, err + } + var addrs []*net.IPNet + for _, a := range addrList { + if ipNet, ok := a.(*net.IPNet); ok { + addrs = append(addrs, ipNet) + } + } + return addrs, nil +} diff --git a/pkg/agent/util/net_linux.go b/pkg/agent/util/net_linux.go index 4ac5220e7e6..94dc5ebe988 100644 --- a/pkg/agent/util/net_linux.go +++ b/pkg/agent/util/net_linux.go @@ -23,6 +23,7 @@ import ( "os" "os/exec" "path/filepath" + "strings" "time" "github.com/containernetworking/plugins/pkg/ip" @@ -193,7 +194,7 @@ func ConfigureLinkAddresses(idx int, ipNets []*net.IPNet) error { for _, addr := range addrsToAdd { klog.V(2).Infof("Adding address %v to interface %s", addr, ifaceName) - if err := netlink.AddrAdd(link, addr); err != nil { + if err := netlink.AddrAdd(link, addr); err != nil && !strings.Contains(err.Error(), "file exists") { return fmt.Errorf("failed to add address %v to interface %s: %v", addr, ifaceName, err) } } @@ -240,6 +241,34 @@ func DeleteOVSPort(brName, portName string) error { return cmd.Run() } +func HostInterfaceExists(ifName string) bool { + _, err := netlink.LinkByName(ifName) + if err == nil { + return true + } + if _, ok := err.(netlink.LinkNotFoundError); ok { + return false + } + klog.ErrorS(err, "Failed to find host interface", "name", ifName) + return false +} + +func GetInterfaceConfig(ifName string) (*net.Interface, []*net.IPNet, []interface{}, error) { + iface, err := net.InterfaceByName(ifName) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to get interface by name %s, err %v", ifName, err) + } + addrs, err := GetIPNetsByLink(iface) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to get address for interface %s, err %v", ifName, err) + } + routes, err := getRoutesOnInterface(iface.Index) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to get routes for iface.Index %d, err %v", iface.Index, err) + } + return iface, addrs, routes, nil +} + func RenameInterface(from, to string) error { klog.InfoS("Renaming interface", "oldName", from, "newName", to) var renameErr error @@ -257,6 +286,59 @@ func RenameInterface(from, to string) error { return nil } +func RemoveLinkIPs(link netlink.Link) error { + addrs, err := netlink.AddrList(link, netlink.FAMILY_ALL) + if err != nil { + return err + } + for i := range addrs { + if err = netlink.AddrDel(link, &addrs[i]); err != nil { + return err + } + } + return nil +} + +func RemoveLinkRoutes(link netlink.Link) error { + routes, err := netlink.RouteList(link, netlink.FAMILY_ALL) + if err != nil { + return err + } + for i := range routes { + if err = netlink.RouteDel(&routes[i]); err != nil { + return err + } + } + return nil +} + +func ConfigureLinkRoutes(link netlink.Link, routes []interface{}) error { + for _, r := range routes { + rt := r.(netlink.Route) + rt.LinkIndex = link.Attrs().Index + if err := netlink.RouteReplace(&rt); err != nil { + return err + } + } + return nil +} + +func getRoutesOnInterface(linkIndex int) ([]interface{}, error) { + link, err := netlink.LinkByIndex(linkIndex) + if err != nil { + return nil, err + } + rs, err := netlink.RouteList(link, netlink.FAMILY_ALL) + if err != nil { + return nil, err + } + var routes []interface{} + for _, r := range rs { + routes = append(routes, r) + } + return routes, nil +} + func renameHostInterface(oriName string, newName string) error { link, err := netlink.LinkByName(oriName) if err != nil { diff --git a/pkg/agent/util/net_windows.go b/pkg/agent/util/net_windows.go index ac5abc8c7a5..fe3476a1be9 100644 --- a/pkg/agent/util/net_windows.go +++ b/pkg/agent/util/net_windows.go @@ -895,3 +895,13 @@ func ReplaceNetNeighbor(neighbor *Neighbor) error { func VirtualAdapterName(name string) string { return fmt.Sprintf("%s (%s)", ContainerVNICPrefix, name) } + +// TODO: Implement GetInterfaceConfig for Windows +func GetInterfaceConfig(ifName string) (*net.Interface, []*net.IPNet, []interface{}, error) { + return nil, nil, nil, nil +} + +// TODO: Implement RenameInterface for Windows +func RenameInterface(from, to string) error { + return nil +} diff --git a/pkg/config/agent/config.go b/pkg/config/agent/config.go index 321e6a95ef8..afaebdec506 100644 --- a/pkg/config/agent/config.go +++ b/pkg/config/agent/config.go @@ -204,6 +204,8 @@ type AgentConfig struct { // NodeType is type of the Node where Antrea Agent is running. // Defaults to "k8sNode". Valid values include "k8sNode", and "externalNode". NodeType string `yaml:"nodeType,omitempty"` + // ExternalNode related configurations. + ExternalNode ExternalNodeConfig `yaml:"externalNode,omitempty"` } type AntreaProxyConfig struct { @@ -274,3 +276,26 @@ type MulticlusterConfig struct { // The default is antrea-agent's Namespace. Namespace string `yaml:"namespace,omitempty"` } + +type ExternalNodeConfig struct { + // The expected Namespace in which the ExternalNode should be created for a VM or baremetal server Node. + // The default value is "default". + // It is used only when NodeType is externalNode. + ExternalNodeNamespace string `yaml:"externalNodeNamespace,omitempty"` + // The policy bypass rules define traffic that should bypass NetworkPolicy rules. + // Each rule contains the following four attributes: + // direction (ingress|egress), protocol(tcp/udp/icmp/ip), remote CIDR, dst port (ICMP doesn't require), + // It is used only when NodeType is externalNode. + PolicyBypassRules []PolicyBypassRule `yaml:"policyBypassRules,omitempty"` +} + +type PolicyBypassRule struct { + // The direction value can be ingress or egress. + Direction string `yaml:"direction,omitempty"` + // The protocol which traffic must match. Supported values are TCP, UDP, ICMP and IP. + Protocol string `yaml:"protocol,omitempty"` + // CIDR marks the destination CIDR for Egress and source CIDR for Ingress. + CIDR string `json:"cidr,omitempty"` + // The destination port of the given protocol. + Port int `yaml:"port,omitempty"` +} diff --git a/pkg/controller/networkpolicy/networkpolicy_controller.go b/pkg/controller/networkpolicy/networkpolicy_controller.go index 35cb67a0efd..a26b1a281ee 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller.go @@ -1145,7 +1145,7 @@ func (n *NetworkPolicyController) getMemberSetForGroupType(groupType grouping.Gr groupMemberSet.Insert(podToGroupMember(pod, true)) } for _, ee := range externalEntities { - groupMemberSet.Insert(externalEntityToGroupMember(ee)) + groupMemberSet.Insert(externalEntityToGroupMember(ee, true)) } return groupMemberSet } @@ -1194,10 +1194,9 @@ func nodeToGroupMember(node *v1.Node) (member *controlplane.GroupMember) { return } -func externalEntityToGroupMember(ee *v1alpha2.ExternalEntity) *controlplane.GroupMember { +func externalEntityToGroupMember(ee *v1alpha2.ExternalEntity, includeIP bool) *controlplane.GroupMember { memberEntity := &controlplane.GroupMember{} namedPorts := make([]controlplane.NamedPort, len(ee.Spec.Ports)) - var ips []controlplane.IPAddress for i, port := range ee.Spec.Ports { namedPorts[i] = controlplane.NamedPort{ Port: port.Port, @@ -1205,8 +1204,10 @@ func externalEntityToGroupMember(ee *v1alpha2.ExternalEntity) *controlplane.Grou Protocol: controlplane.Protocol(port.Protocol), } } - for _, ep := range ee.Spec.Endpoints { - ips = append(ips, ipStrToIPAddress(ep.IP)) + if includeIP { + for _, ep := range ee.Spec.Endpoints { + memberEntity.IPs = append(memberEntity.IPs, ipStrToIPAddress(ep.IP)) + } } eeRef := controlplane.ExternalEntityReference{ Name: ee.Name, @@ -1214,7 +1215,6 @@ func externalEntityToGroupMember(ee *v1alpha2.ExternalEntity) *controlplane.Grou } memberEntity.ExternalEntity = &eeRef memberEntity.Ports = namedPorts - memberEntity.IPs = ips return memberEntity } @@ -1266,7 +1266,7 @@ func (n *NetworkPolicyController) syncAppliedToGroup(key string) error { if entitySet == nil { entitySet = controlplane.GroupMemberSet{} } - entitySet.Insert(externalEntityToGroupMember(extEntity)) + entitySet.Insert(externalEntityToGroupMember(extEntity, false)) memberSetByNode[extEntity.Spec.ExternalNode] = entitySet appGroupNodeNames.Insert(extEntity.Spec.ExternalNode) } diff --git a/pkg/ovs/ovsctl/appctl.go b/pkg/ovs/ovsctl/appctl.go index cf1e36eb681..5ac563e49b5 100644 --- a/pkg/ovs/ovsctl/appctl.go +++ b/pkg/ovs/ovsctl/appctl.go @@ -19,6 +19,7 @@ import ( "bytes" "fmt" "net" + "strconv" "strings" "k8s.io/klog/v2" @@ -227,3 +228,38 @@ func (c *ovsCtlClient) GetDPFeatures() (map[DPFeature]bool, error) { } return features, nil } + +// DeleteDPInterface deletes OVS datapath interface, and it returns with no error if the interface does not exist. +func (c *ovsCtlClient) DeleteDPInterface(name string) error { + cmd := fmt.Sprintf("dpctl/show ovs-system") + out, execErr := c.runAppCtl(cmd, false) + if execErr != nil { + return execErr + } + scanner := bufio.NewScanner(strings.NewReader(string(out))) + scanner.Split(bufio.ScanLines) + for scanner.Scan() { + line := scanner.Text() + fields := strings.Split(line, ": ") + if len(fields) < 2 { + continue + } + nameStr := fields[1] + ifName := strings.Split(nameStr, " (internal)")[0] + if ifName == name { + portStr := strings.Split(fields[0], " ")[1] + port, err := strconv.Atoi(portStr) + if err != nil { + return fmt.Errorf("failed to parse portNum from portStr %s, line %s", portStr, line) + } + cmd = fmt.Sprintf("dpctl/del-if ovs-system %d", port) + _, execErr = c.runAppCtl(cmd, false) + if execErr == nil || strings.Contains(execErr.Error(), "No such device") { + return nil + } else { + return execErr + } + } + } + return nil +} diff --git a/pkg/ovs/ovsctl/interface.go b/pkg/ovs/ovsctl/interface.go index b6c74088621..c925e7c5db4 100644 --- a/pkg/ovs/ovsctl/interface.go +++ b/pkg/ovs/ovsctl/interface.go @@ -60,6 +60,8 @@ type OVSCtlClient interface { RunAppctlCmd(cmd string, needsBridge bool, args ...string) ([]byte, *ExecError) // GetDPFeatures executes "ovs-appctl dpif/show-dp-features" to check supported DP features. GetDPFeatures() (map[DPFeature]bool, error) + // DeleteDPInterface executes "ovs-appctl dpctl/del-if ovs-system $name" to delete OVS datapath interface. + DeleteDPInterface(name string) error } type BadRequestError string diff --git a/pkg/ovs/ovsctl/testing/mock_ovsctl.go b/pkg/ovs/ovsctl/testing/mock_ovsctl.go index 05e0663d8a9..fc8b37f2c35 100644 --- a/pkg/ovs/ovsctl/testing/mock_ovsctl.go +++ b/pkg/ovs/ovsctl/testing/mock_ovsctl.go @@ -1,4 +1,4 @@ -// Copyright 2021 Antrea Authors +// Copyright 2022 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -48,6 +48,20 @@ func (m *MockOVSCtlClient) EXPECT() *MockOVSCtlClientMockRecorder { return m.recorder } +// DeleteDPInterface mocks base method +func (m *MockOVSCtlClient) DeleteDPInterface(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteDPInterface", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteDPInterface indicates an expected call of DeleteDPInterface +func (mr *MockOVSCtlClientMockRecorder) DeleteDPInterface(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteDPInterface", reflect.TypeOf((*MockOVSCtlClient)(nil).DeleteDPInterface), arg0) +} + // DumpFlows mocks base method func (m *MockOVSCtlClient) DumpFlows(arg0 ...string) ([]string, error) { m.ctrl.T.Helper() diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index ca8b25a9c9d..e81c2d8739b 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -196,9 +196,11 @@ func TestAntreaFlexibleIPAMConnectivityFlows(t *testing.T) { Name: "fake-uplink", Index: 0, MAC: uplinkMAC, - IP: &net.IPNet{ - IP: nil, - Mask: nil, + IPs: []*net.IPNet{ + { + IP: nil, + Mask: nil, + }, }, Gateway: "", DNSServers: "",