diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index f75a19c294b..66247216af2 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -67,9 +67,6 @@ featureGates: # Enable certificated-based authentication for IPsec. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "IPsecCertAuth" "default" false) }} -# Enable running agent on an unmanaged VM/BM. -{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "ExternalNode" "default" false) }} - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: {{ .Values.ovs.bridgeName | quote }} diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 5525fc408f8..9dade6f60a6 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -2679,9 +2679,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3780,7 +3777,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 34d282c5003d66a4a1741b1940b64ac6eb464275eed596ebf3a4242864bbb88a + checksum/config: e58a0311b8ecc3d02a5c5f9ab89a6d5e98beb2a7078f5cd36e6007bb860b1018 labels: app: antrea component: antrea-agent @@ -4020,7 +4017,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 34d282c5003d66a4a1741b1940b64ac6eb464275eed596ebf3a4242864bbb88a + checksum/config: e58a0311b8ecc3d02a5c5f9ab89a6d5e98beb2a7078f5cd36e6007bb860b1018 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 55eb55f8be2..21fac35152a 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -2679,9 +2679,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3780,7 +3777,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 34d282c5003d66a4a1741b1940b64ac6eb464275eed596ebf3a4242864bbb88a + checksum/config: e58a0311b8ecc3d02a5c5f9ab89a6d5e98beb2a7078f5cd36e6007bb860b1018 labels: app: antrea component: antrea-agent @@ -4022,7 +4019,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 34d282c5003d66a4a1741b1940b64ac6eb464275eed596ebf3a4242864bbb88a + checksum/config: e58a0311b8ecc3d02a5c5f9ab89a6d5e98beb2a7078f5cd36e6007bb860b1018 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 8b10d59b608..1ecaabc42f9 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -2679,9 +2679,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3780,7 +3777,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 9b7041970d4bf7d5bbf30003b5061a7f5ec2afed8c23c02d28a59e7a22805423 + checksum/config: 8287f6d4c3b3def5067c65e1497df876878161a0b519b6d782298aa27356aab3 labels: app: antrea component: antrea-agent @@ -4020,7 +4017,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 9b7041970d4bf7d5bbf30003b5061a7f5ec2afed8c23c02d28a59e7a22805423 + checksum/config: 8287f6d4c3b3def5067c65e1497df876878161a0b519b6d782298aa27356aab3 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 9c23b27f62b..96e7061f06e 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -2692,9 +2692,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3793,7 +3790,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 94b202753c2ed0a7e187c486ccfbeb094e05ae7ee1f7001afb55e7c45eeeaad3 + checksum/config: c216eb3adc199d8575af41ff151ae1b566381018527dd31a46d5efbcc4c0bde6 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -4079,7 +4076,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 94b202753c2ed0a7e187c486ccfbeb094e05ae7ee1f7001afb55e7c45eeeaad3 + checksum/config: c216eb3adc199d8575af41ff151ae1b566381018527dd31a46d5efbcc4c0bde6 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 400c5dbd557..1d3fd384d8e 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -2679,9 +2679,6 @@ data: # Enable certificated-based authentication for IPsec. # IPsecCertAuth: false - # Enable running agent on an unmanaged VM/BM. - # ExternalNode: false - # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -3780,7 +3777,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 17fb4f0ed9e653e3a470f41f980d2ff89a317686913f62195b6af62869779824 + checksum/config: 745551965d4087e4a0e9854549a6d96472b6eeb12c269a432ea1ae6c873c028a labels: app: antrea component: antrea-agent @@ -4020,7 +4017,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 17fb4f0ed9e653e3a470f41f980d2ff89a317686913f62195b6af62869779824 + checksum/config: 745551965d4087e4a0e9854549a6d96472b6eeb12c269a432ea1ae6c873c028a labels: app: antrea component: antrea-controller diff --git a/build/yamls/externalnode/conf/antrea-agent.conf b/build/yamls/externalnode/conf/antrea-agent.conf index 55e8015bfa2..1fda25a069f 100644 --- a/build/yamls/externalnode/conf/antrea-agent.conf +++ b/build/yamls/externalnode/conf/antrea-agent.conf @@ -32,6 +32,22 @@ featureGates: # Defaults to "k8sNode". Valid values include "k8sNode", and "externalNode". nodeType: externalNode +externalNode: + # The expected Namespace in which the ExternalNode is created. + # Defaults to "default". + #externalNodeNamespace: default + + # The policyBypassRules describes the traffic that is expected to bypass NetworkPolicy rules. + # Each rule contains the following four attributes: + # direction (ingress|egress), protocol(tcp/udp/icmp/ip), remote CIDR, dst port (ICMP doesn't require). + # Here is an example: + # - direction: ingress + # protocol: tcp + # cidr: 1.1.1.1/32 + # port: 22 + # It is used only when NodeType is externalNode. + #policyBypassRules: [] + # The path to access the kubeconfig file used in the connection to K8s APIServer. The file contains the K8s # APIServer endpoint and the token of ServiceAccount required in the connection. clientConnection: diff --git a/build/yamls/externalnode/vm-agent-rbac.yml b/build/yamls/externalnode/vm-agent-rbac.yml new file mode 100644 index 00000000000..084cb742dc7 --- /dev/null +++ b/build/yamls/externalnode/vm-agent-rbac.yml @@ -0,0 +1,112 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: vm-agent + namespace: vm-ns # Change the Namespace to where vm-agent is expected to run. +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vm-agent +rules: + # antrea-controller distributes the CA certificate as a ConfigMap named `antrea-ca` in the Antrea deployment Namespace. + # vm-agent needs to access `antrea-ca` to connect with antrea-controller. + - apiGroups: + - "" + resources: + - configmaps + resourceNames: + - antrea-ca + verbs: + - get + - watch + - list + # This is the content of built-in role kube-system/extension-apiserver-authentication-reader. + # But it doesn't have list/watch permission before K8s v1.17.0 so the extension apiserver (vm-agent) will + # have permission issue after bumping up apiserver library to a version that supports dynamic authentication. + # See https://github.com/kubernetes/kubernetes/pull/85375 + # To support K8s clusters older than v1.17.0, we grant the required permissions directly instead of relying on + # the extension-apiserver-authentication role. + - apiGroups: + - "" + resourceNames: + - extension-apiserver-authentication + resources: + - configmaps + verbs: + - get + - list + - watch + - apiGroups: + - crd.antrea.io + resources: + - antreaagentinfos + verbs: + - get + - update + - apiGroups: + - controlplane.antrea.io + resources: + - networkpolicies + - appliedtogroups + - addressgroups + verbs: + - get + - watch + - list + - apiGroups: + - controlplane.antrea.io + resources: + - nodestatssummaries + verbs: + - create + - apiGroups: + - controlplane.antrea.io + resources: + - networkpolicies/status + verbs: + - create + - get +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vm-agent +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: vm-agent +subjects: + - kind: ServiceAccount + name: vm-agent + namespace: vm-ns # Change the Namespace to where vm-agent is expected to run. +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vm-agent + namespace: vm-ns # Change the Namespace to where vm-agent is expected to run. +rules: + - apiGroups: + - crd.antrea.io + resources: + - externalnodes + verbs: + - get + - watch + - list +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: vm-agent + namespace: vm-ns # Change the Namespace to where vm-agent is expected to run. +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: vm-agent +subjects: + - kind: ServiceAccount + name: vm-agent + namespace: vm-ns # Change the Namespace to where vm-agent is expected to run. diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 6bad497f67a..2bd17cdea64 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -42,6 +42,7 @@ import ( "antrea.io/antrea/pkg/agent/controller/serviceexternalip" "antrea.io/antrea/pkg/agent/controller/traceflow" "antrea.io/antrea/pkg/agent/controller/trafficcontrol" + "antrea.io/antrea/pkg/agent/externalnode" "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/exporter" "antrea.io/antrea/pkg/agent/interfacestore" @@ -60,6 +61,7 @@ import ( "antrea.io/antrea/pkg/agent/stats" agenttypes "antrea.io/antrea/pkg/agent/types" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" + crdv1alpha1informers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" "antrea.io/antrea/pkg/controller/externalippool" "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/log" @@ -226,6 +228,7 @@ func run(o *Options) error { // Initialize agent and node network. agentInitializer := agent.NewInitializer( k8sClient, + crdClient, ovsBridgeClient, ofClient, routeClient, @@ -240,6 +243,7 @@ func run(o *Options) error { networkReadyCh, stopCh, o.nodeType, + o.config.ExternalNode.ExternalNodeNamespace, features.DefaultFeatureGate.Enabled(features.AntreaProxy), o.config.AntreaProxy.ProxyAll, connectUplinkToBridge) @@ -328,7 +332,16 @@ func run(o *Options) error { // podUpdateChannel is a channel for receiving Pod updates from CNIServer and // notifying NetworkPolicyController and EgressController to reconcile rules // related to the updated Pods. - podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) + var podUpdateChannel *channel.SubscribableChannel + // externalEntityUpdateChannel is a channel for receiving ExternalEntity updates from ExternalNodeController and + // notifying NetworkPolicyController to reconcile rules related to the updated ExternalEntities. + var externalEntityUpdateChannel *channel.SubscribableChannel + if o.nodeType == config.K8sNode { + podUpdateChannel = channel.NewSubscribableChannel("PodUpdate", 100) + } else { + externalEntityUpdateChannel = channel.NewSubscribableChannel("ExternalEntityUpdate", 100) + } + // We set flow poll interval as the time interval for rule deletion in the async // rule cache, which is implemented as part of the idAllocator. This is to preserve // the rule info for populating NetworkPolicy fields in the Flow Exporter even @@ -341,12 +354,19 @@ func run(o *Options) error { statusManagerEnabled := antreaPolicyEnabled loggingEnabled := antreaPolicyEnabled + var gwPort, tunPort uint32 + if o.nodeType == config.K8sNode { + gwPort = nodeConfig.GatewayConfig.OFPort + tunPort = nodeConfig.TunnelOFPort + } + networkPolicyController, err := networkpolicy.NewNetworkPolicyController( antreaClientProvider, ofClient, ifaceStore, nodeConfig.Name, podUpdateChannel, + externalEntityUpdateChannel, groupCounters, groupIDUpdates, antreaPolicyEnabled, @@ -356,10 +376,12 @@ func run(o *Options) error { loggingEnabled, asyncRuleDeleteInterval, o.dnsServerOverride, + o.nodeType, v4Enabled, v6Enabled, - nodeConfig.GatewayConfig.OFPort, - nodeConfig.TunnelOFPort) + gwPort, + tunPort, + ) if err != nil { return fmt.Errorf("error creating new NetworkPolicy controller: %v", err) } @@ -414,6 +436,8 @@ func run(o *Options) error { var cniServer *cniserver.CNIServer var cniPodInfoStore cnipodcache.CNIPodInfoStore + var externalNodeController *externalnode.ExternalNodeController + var localExternalNodeInformer cache.SharedIndexInformer if o.nodeType == config.K8sNode { isChaining := false if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { @@ -443,6 +467,22 @@ func run(o *Options) error { return fmt.Errorf("error initializing CNI server: %v", err) } } + } else { + listOptions := func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", nodeConfig.Name).String() + } + localExternalNodeInformer = crdv1alpha1informers.NewFilteredExternalNodeInformer( + crdClient, + o.config.ExternalNode.ExternalNodeNamespace, + resyncPeriodDisabled, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + listOptions, + ) + externalNodeController, err = externalnode.NewExternalNodeController(ovsBridgeClient, ofClient, localExternalNodeInformer, + ifaceStore, externalEntityUpdateChannel, o.config.ExternalNode.ExternalNodeNamespace, o.config.ExternalNode.PolicyBypassRules) + if err != nil { + return fmt.Errorf("error creating ExternalNode controller: %v", err) + } } var traceflowController *traceflow.Controller @@ -536,6 +576,10 @@ func run(o *Options) error { go podUpdateChannel.Run(stopCh) go cniServer.Run(stopCh) go nodeRouteController.Run(stopCh) + } else { + go externalEntityUpdateChannel.Run(stopCh) + go localExternalNodeInformer.Run(stopCh) + go externalNodeController.Run(stopCh) } if networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec && diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index 55384c54cbf..b28eb05b0f7 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -23,6 +23,7 @@ import ( "github.com/spf13/pflag" "gopkg.in/yaml.v2" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/component-base/featuregate" "k8s.io/klog/v2" @@ -496,9 +497,37 @@ func (o *Options) validateExternalNodeOptions() error { if unsupported != nil { return fmt.Errorf("unsupported features on Virtual Machine: {%s}", strings.Join(unsupported, ", ")) } + if err := o.validatePolicyBypassRulesConfig(); err != nil { + return fmt.Errorf("policyBypassRules configuration is invalid: %w", err) + } return nil } +func (o *Options) validatePolicyBypassRulesConfig() error { + if len(o.config.ExternalNode.PolicyBypassRules) == 0 { + return nil + } + allowedProtocols := sets.NewString("tcp", "udp", "icmp", "ip") + for _, rule := range o.config.ExternalNode.PolicyBypassRules { + if rule.Direction != "ingress" && rule.Direction != "egress" { + return fmt.Errorf("direction %s for policyBypassRule is invalid", rule.Direction) + } + if !allowedProtocols.Has(rule.Protocol) { + return fmt.Errorf("protocol %s for policyBypassRule is invalid", rule.Protocol) + } + if _, _, err := net.ParseCIDR(rule.CIDR); err != nil { + return fmt.Errorf("cidr %s for policyBypassRule is invalid", rule.CIDR) + } + if rule.Port == 0 && (rule.Protocol == "tcp" || rule.Protocol == "udp") { + return fmt.Errorf("missing port for policyBypassRule when protocol is %s", rule.Protocol) + } + if rule.Port < 0 || rule.Port > 65535 { + return fmt.Errorf("port %d for policyBypassRule is invalid", rule.Port) + } + } + return nil + +} func (o *Options) setExternalNodeDefaultOptions() { // Following options are default values for agent running on a Virtual Machine. // They are set to avoid unexpected agent crash. @@ -509,4 +538,7 @@ func (o *Options) setExternalNodeDefaultOptions() { o.config.EnablePrometheusMetrics = new(bool) *o.config.EnablePrometheusMetrics = false } + if o.config.ExternalNode.ExternalNodeNamespace == "" { + o.config.ExternalNode.ExternalNodeNamespace = "default" + } } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index d32cb7a28fb..914b85ee42c 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -38,6 +38,7 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/controller/noderoute" "antrea.io/antrea/pkg/agent/controller/trafficcontrol" + "antrea.io/antrea/pkg/agent/externalnode" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/openflow/cookie" @@ -45,6 +46,7 @@ import ( "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/agent/wireguard" + "antrea.io/antrea/pkg/client/clientset/versioned" "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/ovs/ovsctl" @@ -82,6 +84,7 @@ var otherConfigKeysForIPsecCertificates = []string{"certificate", "private_key", // Initializer knows how to setup host networking, OpenVSwitch, and Openflow. type Initializer struct { client clientset.Interface + crdClient versioned.Interface ovsBridgeClient ovsconfig.OVSBridgeClient ofClient openflow.Client routeClient route.Interface @@ -99,14 +102,16 @@ type Initializer struct { connectUplinkToBridge bool // networkReadyCh should be closed once the Node's network is ready. // The CNI server will wait for it before handling any CNI Add requests. - proxyAll bool - networkReadyCh chan<- struct{} - stopCh <-chan struct{} - nodeType config.NodeType + proxyAll bool + networkReadyCh chan<- struct{} + stopCh <-chan struct{} + nodeType config.NodeType + externalNodeNamespace string } func NewInitializer( k8sClient clientset.Interface, + crdClient versioned.Interface, ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, routeClient route.Interface, @@ -121,6 +126,7 @@ func NewInitializer( networkReadyCh chan<- struct{}, stopCh <-chan struct{}, nodeType config.NodeType, + externalNodeNamespace string, enableProxy bool, proxyAll bool, connectUplinkToBridge bool, @@ -128,6 +134,7 @@ func NewInitializer( return &Initializer{ ovsBridgeClient: ovsBridgeClient, client: k8sClient, + crdClient: crdClient, ifaceStore: ifaceStore, ofClient: ofClient, routeClient: routeClient, @@ -141,6 +148,7 @@ func NewInitializer( networkReadyCh: networkReadyCh, stopCh: stopCh, nodeType: nodeType, + externalNodeNamespace: externalNodeNamespace, enableProxy: enableProxy, proxyAll: proxyAll, connectUplinkToBridge: connectUplinkToBridge, @@ -280,9 +288,16 @@ func (i *Initializer) initInterfaceStore() error { case interfacestore.AntreaTunnel: intf = parseTunnelInterfaceFunc(port, ovsPort) case interfacestore.AntreaHost: - // Not load the host interface, because it is configured on the OVS bridge port, and we don't need a - // specific interface in the interfaceStore. - intf = nil + if port.Name == i.ovsBridge { + // Need not to load the OVS bridge port to the interfaceStore + intf = nil + } else { + var err error + intf, err = externalnode.ParseHostInterfaceConfig(i.ovsBridgeClient, port, ovsPort) + if err != nil { + return err + } + } case interfacestore.AntreaContainer: // The port should be for a container interface. intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort, true) @@ -1170,11 +1185,24 @@ func (i *Initializer) initNodeLocalConfig() error { } func (i *Initializer) initVMLocalConfig(nodeName string) error { + klog.InfoS("Initializing VM config", "ExternalNode", nodeName) + if err := wait.PollImmediateUntil(10*time.Second, func() (done bool, err error) { + _, err = i.crdClient.CrdV1alpha1().ExternalNodes(i.externalNodeNamespace).Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return false, nil + } + return true, nil + }, i.stopCh); err != nil { + klog.Info("Stopped waiting for ExternalNode") + return err + } + i.nodeConfig = &config.NodeConfig{ Name: nodeName, Type: config.ExternalNode, OVSBridge: i.ovsBridge, } + klog.InfoS("Finished VM config initialization", "ExternalNode", nodeName) return nil } diff --git a/pkg/agent/agent_linux.go b/pkg/agent/agent_linux.go index 6213126ba94..df00e644a49 100644 --- a/pkg/agent/agent_linux.go +++ b/pkg/agent/agent_linux.go @@ -53,7 +53,7 @@ func (i *Initializer) prepareOVSBridgeForK8sNode() error { uplinkNetConfig := i.nodeConfig.UplinkNetConfig uplinkNetConfig.Name = adapter.Name uplinkNetConfig.MAC = adapter.HardwareAddr - uplinkNetConfig.IP = i.nodeConfig.NodeIPv4Addr + uplinkNetConfig.IPs = []*net.IPNet{i.nodeConfig.NodeIPv4Addr} uplinkNetConfig.Index = adapter.Index // Gateway and DNSServers are not configured at adapter in Linux // Limitation: dynamic DNS servers will be lost after DHCP lease expired diff --git a/pkg/agent/agent_windows.go b/pkg/agent/agent_windows.go index 0fbe4aa6099..72b9959c1c1 100644 --- a/pkg/agent/agent_windows.go +++ b/pkg/agent/agent_windows.go @@ -81,7 +81,7 @@ func (i *Initializer) prepareHNSNetworkAndOVSExtension() error { } i.nodeConfig.UplinkNetConfig.Name = adapter.Name i.nodeConfig.UplinkNetConfig.MAC = adapter.HardwareAddr - i.nodeConfig.UplinkNetConfig.IP = i.nodeConfig.NodeTransportIPv4Addr + i.nodeConfig.UplinkNetConfig.IPs = []*net.IPNet{i.nodeConfig.NodeTransportIPv4Addr} i.nodeConfig.UplinkNetConfig.Index = adapter.Index defaultGW, err := util.GetDefaultGatewayByInterfaceIndex(adapter.Index) if err != nil { diff --git a/pkg/agent/config/node_config.go b/pkg/agent/config/node_config.go index 1b6c9fc29fc..407358ab74e 100644 --- a/pkg/agent/config/node_config.go +++ b/pkg/agent/config/node_config.go @@ -103,7 +103,8 @@ type AdapterNetConfig struct { Name string Index int MAC net.HardwareAddr - IP *net.IPNet + IPs []*net.IPNet + MTU int Gateway string DNSServers string Routes []interface{} diff --git a/pkg/agent/controller/networkpolicy/cache.go b/pkg/agent/controller/networkpolicy/cache.go index 9f38fa375f6..6730c8d2e32 100644 --- a/pkg/agent/controller/networkpolicy/cache.go +++ b/pkg/agent/controller/networkpolicy/cache.go @@ -27,6 +27,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/metrics" agenttypes "antrea.io/antrea/pkg/agent/types" v1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2" @@ -394,7 +395,8 @@ func toIGMPReportGroupAddressIndexFunc(obj interface{}) ([]string, error) { } // newRuleCache returns a new *ruleCache. -func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Subscriber, serviceGroupIDUpdate <-chan string) *ruleCache { +func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Subscriber, externalEntityUpdateSubscriber channel.Subscriber, + serviceGroupIDUpdate <-chan string, nodeType config.NodeType) *ruleCache { rules := cache.NewIndexer( ruleKeyFunc, cache.Indexers{ @@ -413,14 +415,20 @@ func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Sub dirtyRuleHandler: dirtyRuleHandler, groupIDUpdates: serviceGroupIDUpdate, } - // Subscribe Pod update events from CNIServer. - podUpdateSubscriber.Subscribe(cache.processPodUpdate) + if nodeType == config.K8sNode { + // Subscribe Pod update events from CNIServer. + podUpdateSubscriber.Subscribe(cache.processPodUpdate) + } else { + // Subscribe ExternalEntity update events from ExternalNodeController + externalEntityUpdateSubscriber.Subscribe(cache.processExternalEntityUpdate) + } + go cache.processGroupIDUpdates() return cache } // processPodUpdate will be called when CNIServer publishes a Pod update event. -// It finds out AppliedToGroups that contains this Pod and trigger reconciling +// It finds out AppliedToGroups that contain this Pod and triggers reconciliation // of related rules. // It can enforce NetworkPolicies to newly added Pods right after CNI ADD is // done if antrea-controller has computed the Pods' policies and propagated @@ -443,6 +451,24 @@ func (c *ruleCache) processPodUpdate(e interface{}) { } } +// processExternalEntityUpdate will be called when ExternalNodeController publishes an ExternalEntity update event. +// It finds out AppliedToGroups that contain this ExternalNode converted ExternalEntity and triggers reconciliation +// of related rules. +// It can enforce NetworkPolicies to ExternalEntities after ExternalEntityInterface is realised in the interface store. +func (c *ruleCache) processExternalEntityUpdate(e interface{}) { + externalEntityRef := e.(v1beta.ExternalEntityReference) + member := &v1beta.GroupMember{ + ExternalEntity: &externalEntityRef, + } + c.appliedToSetLock.RLock() + defer c.appliedToSetLock.RUnlock() + for group, memberSet := range c.appliedToSetByGroup { + if memberSet.Has(member) { + c.onAppliedToGroupUpdate(group) + } + } +} + // processGroupIDUpdates is an infinite loop that takes Service groupID // update events from the channel, finds out rules that refer this Service in // ToServices field and use dirtyRuleHandler to re-queue these rules. diff --git a/pkg/agent/controller/networkpolicy/cache_test.go b/pkg/agent/controller/networkpolicy/cache_test.go index 00c01d3ed25..9c0e0120993 100644 --- a/pkg/agent/controller/networkpolicy/cache_test.go +++ b/pkg/agent/controller/networkpolicy/cache_test.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/util/channel" @@ -278,7 +279,7 @@ func newFakeRuleCache() (*ruleCache, *dirtyRuleRecorder, *channel.SubscribableCh recorder := newDirtyRuleRecorder() podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) ch2 := make(chan string, 100) - c := newRuleCache(recorder.Record, podUpdateChannel, ch2) + c := newRuleCache(recorder.Record, podUpdateChannel, nil, ch2, config.K8sNode) return c, recorder, podUpdateChannel } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index 2e3bfdc1d22..814050eeccd 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -31,6 +31,7 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/flowexporter/connections" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" @@ -80,6 +81,8 @@ type Controller struct { multicastEnabled bool // loggingEnabled indicates where Antrea policy audit logging is enabled. loggingEnabled bool + // nodeType indicates type of the Node where Antrea Agent is running on. + nodeType config.NodeType // antreaClientProvider provides interfaces to get antreaClient, which can be // used to watch Antrea AddressGroups, AppliedToGroups, and NetworkPolicies. // We need to get antreaClient dynamically because the apiserver cert can be @@ -119,6 +122,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, ifaceStore interfacestore.InterfaceStore, nodeName string, podUpdateSubscriber channel.Subscriber, + externalEntityUpdateSubscriber channel.Subscriber, groupCounters []proxytypes.GroupCounter, groupIDUpdates <-chan string, antreaPolicyEnabled bool, @@ -128,6 +132,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, loggingEnabled bool, asyncRuleDeleteInterval time.Duration, dnsServerOverride string, + nodeType config.NodeType, v4Enabled bool, v6Enabled bool, gwPort, tunPort uint32) (*Controller, error) { @@ -136,6 +141,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, antreaClientProvider: antreaClientGetter, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "networkpolicyrule"), ofClient: ofClient, + nodeType: nodeType, antreaPolicyEnabled: antreaPolicyEnabled, antreaProxyEnabled: antreaProxyEnabled, statusManagerEnabled: statusManagerEnabled, @@ -157,7 +163,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, } c.reconciler = newReconciler(ofClient, ifaceStore, idAllocator, c.fqdnController, groupCounters, v4Enabled, v6Enabled, antreaPolicyEnabled, multicastEnabled) - c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, groupIDUpdates) + c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, externalEntityUpdateSubscriber, groupIDUpdates, nodeType) if statusManagerEnabled { c.statusManager = newStatusController(antreaClientGetter, nodeName, c.ruleCache) } diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index d22b643a305..a95e63f7725 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -61,7 +61,7 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { ch2 := make(chan string, 100) groupIDAllocator := openflow.NewGroupAllocator(false) groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)} - controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, groupCounters, ch2, true, true, true, false, true, testAsyncDeleteInterval, "8.8.8.8:53", true, false, config.HostGatewayOFPort, config.DefaultTunOFPort) + controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, false, true, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort) reconciler := newMockReconciler() controller.reconciler = reconciler controller.antreaPolicyLogger = nil diff --git a/pkg/agent/controller/networkpolicy/reject.go b/pkg/agent/controller/networkpolicy/reject.go index 5c5e4690b18..5034879652d 100644 --- a/pkg/agent/controller/networkpolicy/reject.go +++ b/pkg/agent/controller/networkpolicy/reject.go @@ -22,6 +22,7 @@ import ( "antrea.io/libOpenflow/protocol" "antrea.io/ofnet/ofctrl" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" binding "antrea.io/antrea/pkg/ovs/openflow" @@ -122,6 +123,9 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { // response is being generated for locally-originated traffic that went through // kube-proxy and was re-injected into the bridge through antrea-gw. isServiceTraffic := func() bool { + if c.nodeType == config.ExternalNode { + return false + } if c.antreaProxyEnabled { matches := pktIn.GetMatches() if match := getMatchRegField(matches, openflow.ServiceEPStateField); match != nil { @@ -158,7 +162,7 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { tunPort = uint32(openflow13.P_CONTROLLER) } inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, tunPort) - mutateFunc := getRejectPacketOutMutateFunc(packetOutType) + mutateFunc := getRejectPacketOutMutateFunc(packetOutType, c.nodeType) if proto == protocol.Type_TCP { // Get TCP data. @@ -256,12 +260,19 @@ func getRejectOFPorts(rejectType RejectType, sIface, dIface *interfacestore.Inte case RejectServiceLocal: inPort = uint32(sIface.OFPort) case RejectPodRemoteToLocal: - inPort = gwOFPort + if dIface.Type == interfacestore.ExternalEntityInterface { + inPort = uint32(dIface.EntityInterfaceConfig.UplinkPort.OFPort) + } else { + inPort = gwOFPort + } outPort = uint32(dIface.OFPort) case RejectServiceRemoteToLocal: inPort = gwOFPort case RejectLocalToRemote: inPort = uint32(sIface.OFPort) + if sIface.Type == interfacestore.ExternalEntityInterface { + outPort = uint32(sIface.EntityInterfaceConfig.UplinkPort.OFPort) + } case RejectNoAPServiceLocal: inPort = uint32(sIface.OFPort) outPort = gwOFPort @@ -273,7 +284,7 @@ func getRejectOFPorts(rejectType RejectType, sIface, dIface *interfacestore.Inte } // getRejectPacketOutMutateFunc returns the mutate func of a packetOut based on the RejectType. -func getRejectPacketOutMutateFunc(rejectType RejectType) func(binding.PacketOutBuilder) binding.PacketOutBuilder { +func getRejectPacketOutMutateFunc(rejectType RejectType, nodeType config.NodeType) func(binding.PacketOutBuilder) binding.PacketOutBuilder { var mutatePacketOut func(binding.PacketOutBuilder) binding.PacketOutBuilder switch rejectType { case RejectServiceLocal: @@ -283,6 +294,10 @@ func getRejectPacketOutMutateFunc(rejectType RejectType) func(binding.PacketOutB } case RejectLocalToRemote: tableID := openflow.L3ForwardingTable.GetID() + // L3ForwardingTable is not initialized for ExternalNode case since layer 3 is not needed. + if nodeType == config.ExternalNode { + tableID = openflow.L2ForwardingCalcTable.GetID() + } mutatePacketOut = func(packetOutBuilder binding.PacketOutBuilder) binding.PacketOutBuilder { return packetOutBuilder.AddResubmitAction(nil, &tableID) } diff --git a/pkg/agent/controller/networkpolicy/status_controller_test.go b/pkg/agent/controller/networkpolicy/status_controller_test.go index d719e8baa10..8db35fa0bd9 100644 --- a/pkg/agent/controller/networkpolicy/status_controller_test.go +++ b/pkg/agent/controller/networkpolicy/status_controller_test.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/util/channel" ) @@ -51,7 +52,7 @@ func (c *fakeNetworkPolicyControl) getNetworkPolicyStatus() *v1beta2.NetworkPoli } func newTestStatusController() (*StatusController, *ruleCache, *fakeNetworkPolicyControl) { - ruleCache := newRuleCache(func(s string) {}, channel.NewSubscribableChannel("PodUpdate", 100), make(chan string, 100)) + ruleCache := newRuleCache(func(s string) {}, channel.NewSubscribableChannel("PodUpdate", 100), nil, make(chan string, 100), config.K8sNode) statusControl := &fakeNetworkPolicyControl{} statusController := newStatusController(nil, testNode1, ruleCache) statusController.statusControlInterface = statusControl diff --git a/pkg/agent/externalnode/external_node_controller.go b/pkg/agent/externalnode/external_node_controller.go new file mode 100644 index 00000000000..28d5dccbb28 --- /dev/null +++ b/pkg/agent/externalnode/external_node_controller.go @@ -0,0 +1,682 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externalnode + +import ( + "fmt" + "net" + "reflect" + "strings" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" + "antrea.io/antrea/pkg/agent/util" + "antrea.io/antrea/pkg/apis/controlplane/v1beta2" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" + enlister "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" + agentConfig "antrea.io/antrea/pkg/config/agent" + binding "antrea.io/antrea/pkg/ovs/openflow" + "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/ovs/ovsctl" + "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/env" + "antrea.io/antrea/pkg/util/externalnode" + "antrea.io/antrea/pkg/util/ip" + "antrea.io/antrea/pkg/util/k8s" +) + +const ( + controllerName = "ExternalNodeController" + // How long to wait before retrying the processing of an ExternalNode change. + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second + // Disable resyncing. + resyncPeriod time.Duration = 0 + + ovsExternalIDUplinkName = "uplink-name" + ovsExternalIDUplinkPort = "uplink-port" + ovsExternalIDEntityName = "entity-name" + ovsExternalIDEntityNamespace = "entity-namespace" + ovsExternalIDIPs = "ip-address" + ipsSplitter = "," +) + +var ( + keyFunc = cache.MetaNamespaceKeyFunc + splitKeyFunc = cache.SplitMetaNamespaceKey +) + +type ExternalNodeController struct { + ovsBridgeClient ovsconfig.OVSBridgeClient + ovsctlClient ovsctl.OVSCtlClient + ofClient openflow.Client + externalNodeInformer cache.SharedIndexInformer + externalNodeLister enlister.ExternalNodeLister + externalNodeListerSynced cache.InformerSynced + queue workqueue.RateLimitingInterface + ifaceStore interfacestore.InterfaceStore + syncedExternalNode *v1alpha1.ExternalNode + // externalEntityUpdateNotifier is used for notifying ExternalEntity updates to NetworkPolicyController. + externalEntityUpdateNotifier channel.Notifier + nodeName string + externalNodeNamespace string + policyBypassRules []agentConfig.PolicyBypassRule +} + +func NewExternalNodeController(ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, externalNodeInformer cache.SharedIndexInformer, + ifaceStore interfacestore.InterfaceStore, externalEntityUpdateNotifier channel.Notifier, externalNodeNamespace string, policyBypassRules []agentConfig.PolicyBypassRule) (*ExternalNodeController, error) { + c := &ExternalNodeController{ + ovsBridgeClient: ovsBridgeClient, + ovsctlClient: ovsctl.NewClient(ovsBridgeClient.GetBridgeName()), + ofClient: ofClient, + externalNodeInformer: externalNodeInformer, + externalNodeLister: enlister.NewExternalNodeLister(externalNodeInformer.GetIndexer()), + externalNodeListerSynced: externalNodeInformer.HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "externalNode"), + ifaceStore: ifaceStore, + externalEntityUpdateNotifier: externalEntityUpdateNotifier, + policyBypassRules: policyBypassRules, + } + nodeName, err := env.GetNodeName() + if err != nil { + return nil, err + } + c.nodeName = nodeName + c.externalNodeNamespace = externalNodeNamespace + c.externalNodeInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueueExternalNodeAdd, + UpdateFunc: c.enqueueExternalNodeUpdate, + DeleteFunc: c.enqueueExternalNodeDelete, + }, + resyncPeriod) + + return c, nil +} + +// Run will create a worker (goroutine) which will process the ExternalNode events from the work queue. +func (c *ExternalNodeController) Run(stopCh <-chan struct{}) { + defer c.queue.ShutDown() + + klog.InfoS("Starting controller", "name", controllerName) + defer klog.InfoS("Shutting down controller", "name", controllerName) + + if err := wait.PollImmediateUntil(5*time.Second, func() (done bool, err error) { + if err = c.reconcile(); err != nil { + klog.ErrorS(err, "ExternalNodeController failed during reconciliation") + return false, nil + } + return true, nil + }, stopCh); err != nil { + klog.Info("Stopped ExternalNodeController reconciliation") + return + } + if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.externalNodeListerSynced) { + klog.Error("Failed to wait for syncing ExternalNodes cache") + return + } + + c.queue.Add(k8s.NamespacedName(c.externalNodeNamespace, c.nodeName)) + go wait.Until(c.worker, time.Second, stopCh) + + <-stopCh +} + +func (c *ExternalNodeController) enqueueExternalNodeAdd(obj interface{}) { + en := obj.(*v1alpha1.ExternalNode) + key, _ := keyFunc(en) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode ADD event", "ExternalNode", klog.KObj(en)) +} + +func (c *ExternalNodeController) enqueueExternalNodeUpdate(oldObj interface{}, newObj interface{}) { + oldEN := oldObj.(*v1alpha1.ExternalNode) + newEN := newObj.(*v1alpha1.ExternalNode) + if reflect.DeepEqual(oldEN.Spec.Interfaces, newEN.Spec.Interfaces) { + klog.InfoS("Skip enqueuing ExternalNode UPDATE event as no changes for interfaces", "ExternalNode", klog.KObj(newEN)) + return + } + key, _ := keyFunc(newEN) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode UPDATE event", "ExternalNode", klog.KObj(newEN)) +} + +func (c *ExternalNodeController) enqueueExternalNodeDelete(obj interface{}) { + en := obj.(*v1alpha1.ExternalNode) + key, _ := keyFunc(en) + c.queue.Add(key) + klog.InfoS("Enqueued ExternalNode DELETE event", "ExternalNode", klog.KObj(en)) +} + +func (c *ExternalNodeController) reconcile() error { + klog.InfoS("Reconciling for controller", "name", controllerName) + if err := c.reconcileHostUplinkFlows(); err != nil { + return fmt.Errorf("failed to reconcile host uplink flows %v", err) + } + if err := c.reconcilePolicyBypassFlows(); err != nil { + return fmt.Errorf("failed to reconcile reserved flows %v", err) + } + klog.InfoS("Reconciled for controller", "name", controllerName) + return nil +} + +func (c *ExternalNodeController) reconcileHostUplinkFlows() error { + hostIfaces := c.ifaceStore.GetInterfacesByType(interfacestore.ExternalEntityInterface) + for _, hostIface := range hostIfaces { + if err := c.ofClient.InstallVMUplinkFlows(hostIface.InterfaceName, hostIface.OVSPortConfig.OFPort, hostIface.UplinkPort.OFPort); err != nil { + return err + } + klog.InfoS("Reconciled host uplink flow for ExternalEntityInterface", "ifName", hostIface.InterfaceName) + } + return nil +} + +func (c *ExternalNodeController) reconcilePolicyBypassFlows() error { + for _, rule := range c.policyBypassRules { + klog.V(2).InfoS("Installing policy bypass flows", "protocol", rule.Protocol, "CIDR", rule.CIDR, "port", rule.Port, "direction", rule.Direction) + protocol := parseProtocol(rule.Protocol) + _, ipNet, _ := net.ParseCIDR(rule.CIDR) + if err := c.ofClient.InstallPolicyBypassFlows(protocol, ipNet, uint16(rule.Port), rule.Direction == "ingress"); err != nil { + return err + } + } + klog.InfoS("Installed policy bypass flows", "RuleCount", len(c.policyBypassRules)) + return nil +} + +// worker is a long-running function that will continuously call the processNextWorkItem function in +// order to read and process a message on the work queue. +func (c *ExternalNodeController) worker() { + for c.processNextWorkItem() { + } +} + +func (c *ExternalNodeController) processNextWorkItem() bool { + obj, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(obj) + + if key, ok := obj.(string); !ok { + c.queue.Forget(obj) + klog.Errorf("Expected string type in work queue but got %#v", obj) + return true + } else if err := c.syncExternalNode(key); err == nil { + // If no error occurs, then forget this item so it does not get queued again until + // another change happens. + c.queue.Forget(key) + } else { + // Put the item back on the work queue to handle any transient errors. + c.queue.AddRateLimited(key) + klog.ErrorS(err, "Error syncing ExternalNode", "ExternalNode", key) + } + return true +} + +func (c *ExternalNodeController) syncExternalNode(key string) error { + _, name, err := splitKeyFunc(key) + if err != nil { + // This err should not occur. + return err + } + en, err := c.externalNodeLister.ExternalNodes(c.externalNodeNamespace).Get(name) + if errors.IsNotFound(err) { + return c.deleteExternalNode() + } + + if c.syncedExternalNode == nil { + return c.addExternalNode(en) + } else { + return c.updateExternalNode(c.syncedExternalNode, en) + } +} + +func (c *ExternalNodeController) addExternalNode(en *v1alpha1.ExternalNode) error { + klog.InfoS("Adding ExternalNode", "ExternalNode", klog.KObj(en)) + eeName, err := externalnode.GenExternalEntityName(en) + if err != nil { + return err + } + ifName, ips, err := getHostInterfaceName(en.Spec.Interfaces[0]) + if err != nil { + return err + } + if err := c.addInterface(ifName, en.Namespace, eeName, ips); err != nil { + return err + } + c.syncedExternalNode = en + // Notify the ExternalEntity event to NetworkPolicyController. + c.externalEntityUpdateNotifier.Notify(v1beta2.ExternalEntityReference{ + Name: eeName, + Namespace: en.Namespace, + }) + return nil +} + +func (c *ExternalNodeController) addInterface(ifName string, eeNamespace string, eeName string, ips []string) error { + hostIface, ifaceExists := c.ifaceStore.GetInterfaceByName(ifName) + if !ifaceExists { + klog.InfoS("Creating OVS ports and flows for ExternalEntityInterface", "ifName", ifName, "externalEntity", eeName, "ips", ips) + uplinkName := util.GenerateUplinkInterfaceName(ifName) + iface, err := c.createOVSPortsAndFlows(uplinkName, ifName, eeNamespace, eeName, ips) + if err != nil { + return err + } + c.ifaceStore.AddInterface(iface) + return nil + } + klog.InfoS("Updating OVS port data", "ifName", ifName, "externalEntity", eeName, "ips", ips) + portUUID := hostIface.PortUUID + portName := hostIface.InterfaceName + portData, ovsErr := c.ovsBridgeClient.GetPortData(portUUID, portName) + if ovsErr != nil { + return ovsErr + } + preEEName := portData.ExternalIDs[ovsExternalIDEntityName] + preIPs := sets.NewString(strings.Split(portData.ExternalIDs[ovsExternalIDIPs], ipsSplitter)...) + if preEEName == eeName && sets.NewString(ips...).Equal(preIPs) { + klog.InfoS("Skipping updating OVS port data as both entity name and ip are not changed", "ifName", ifName) + return nil + } + + iface, err := c.updateOVSPortsData(hostIface, portData, eeName, ips) + if err != nil { + return err + } + c.ifaceStore.AddInterface(iface) + return nil +} + +func (c *ExternalNodeController) updateExternalNode(preEN *v1alpha1.ExternalNode, curEN *v1alpha1.ExternalNode) error { + klog.InfoS("Updating ExternalNode", "ExternalNode", klog.KObj(curEN)) + if reflect.DeepEqual(preEN.Spec.Interfaces[0], curEN.Spec.Interfaces[0]) { + klog.InfoS("Skip processing ExternalNode update as no changes for Interface[0]", "ExternalNode", klog.KObj(curEN)) + return nil + } + preEEName, err := externalnode.GenExternalEntityName(preEN) + if err != nil { + return err + } + preIfName, preIPs, err := getHostInterfaceName(preEN.Spec.Interfaces[0]) + if err != nil { + return err + } + curEEName, err := externalnode.GenExternalEntityName(curEN) + if err != nil { + return err + } + curIfName, curIPs, err := getHostInterfaceName(curEN.Spec.Interfaces[0]) + if err != nil { + return err + } + if preIfName != curIfName { + klog.InfoS("Found interface name is changed", "preName", preIfName, "curName", curIfName) + if err = c.addInterface(curIfName, curEN.Namespace, curEEName, curIPs); err != nil { + return err + } + ifaceConfig, ifaceExists := c.ifaceStore.GetInterfaceByName(preIfName) + if ifaceExists { + if err = c.deleteInterface(ifaceConfig); err != nil { + return err + } + } + } else if !reflect.DeepEqual(preIPs, curIPs) || preEEName != curEEName { + klog.InfoS("Found interface configuration is changed", "preIPs", preIPs, "preExternalEntity", preEEName, + "curIPs", curIPs, "curExternalEntity", curEEName) + if err = c.addInterface(curIfName, curEN.Namespace, curEEName, curIPs); err != nil { + return err + } + } + c.syncedExternalNode = curEN + // Notify the ExternalEntity event to NetworkPolicyController. + c.externalEntityUpdateNotifier.Notify(v1beta2.ExternalEntityReference{ + Name: curEEName, + Namespace: curEN.Namespace, + }) + return nil +} + +func (c *ExternalNodeController) deleteExternalNode() error { + if err := c.deleteInterfaces(); err != nil { + return err + } + c.syncedExternalNode = nil + return nil +} + +func (c *ExternalNodeController) deleteInterfaces() error { + hostIfaces := c.ifaceStore.GetInterfacesByType(interfacestore.ExternalEntityInterface) + for _, hostIface := range hostIfaces { + if err := c.deleteInterface(hostIface); err != nil { + return err + } + } + return nil +} + +func (c *ExternalNodeController) deleteInterface(interfaceConfig *interfacestore.InterfaceConfig) error { + klog.InfoS("Deleting interface", "ifName", interfaceConfig.InterfaceName) + if err := c.removeOVSPortsAndFlows(interfaceConfig); err != nil { + return err + } + c.ifaceStore.DeleteInterface(interfaceConfig) + return nil +} + +func (c *ExternalNodeController) createOVSPortsAndFlows(uplinkName, hostIFName, eeNamespace, eeName string, ips []string) (*interfacestore.InterfaceConfig, error) { + iface, addrs, routes, err := util.GetInterfaceConfig(hostIFName) + if err != nil { + return nil, err + } + adapterConfig := &config.AdapterNetConfig{ + Name: hostIFName, + Index: iface.Index, + MAC: iface.HardwareAddr, + IPs: addrs, + Routes: routes, + MTU: iface.MTU, + } + if err = util.RenameInterface(hostIFName, uplinkName); err != nil { + return nil, err + } + success := false + defer func() { + if !success { + if err = util.RenameInterface(uplinkName, hostIFName); err != nil { + klog.ErrorS(err, "Failed to restore uplink name back to host interface name. Manual cleanup is required", "uplinkName", uplinkName, "hostIFName", hostIFName) + } + } + }() + + // Create uplink port in OVS. + uplinkExternalIDs := map[string]interface{}{ + interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaUplink, + } + uplinkUUID, ovsErr := c.ovsBridgeClient.CreatePort(uplinkName, uplinkName, uplinkExternalIDs) + if ovsErr != nil { + return nil, fmt.Errorf("failed to create uplink port %s in OVS, err %v", uplinkName, ovsErr) + } + defer func() { + if !success { + if ovsErr = c.ovsBridgeClient.DeletePort(uplinkUUID); ovsErr != nil { + klog.ErrorS(err, "Failed to delete uplink port. Manual cleanup is required", "portUUID", uplinkUUID, "uplinkName", uplinkName) + } + } + }() + uplinkOFPort, ovsErr := c.ovsBridgeClient.GetOFPort(uplinkName, false) + if ovsErr != nil { + return nil, ovsErr + } + klog.InfoS("Added uplink port in OVS", "port", uplinkOFPort, "uplinkName", uplinkName) + + // Create host port in OVS. + attachInfo := GetOVSAttachInfo(uplinkName, uplinkUUID, eeName, eeNamespace, ips) + hostIfUUID, ovsErr := c.ovsBridgeClient.CreateInternalPort(hostIFName, 0, adapterConfig.MAC.String(), attachInfo) + if ovsErr != nil { + return nil, fmt.Errorf("failed to create OVS internal port for host interface %s, err %v", hostIFName, ovsErr) + } + defer func() { + if !success { + if ovsErr = c.ovsBridgeClient.DeletePort(hostIfUUID); ovsErr != nil { + klog.ErrorS(err, "Failed to delete host interface port. Manual cleanup is required", "portUUID", hostIfUUID, "hostIFName", hostIFName) + } + } + }() + hostOFPort, ovsErr := c.ovsBridgeClient.GetOFPort(hostIFName, false) + if ovsErr != nil { + return nil, ovsErr + } + klog.InfoS("Created an OVS internal port for host interface", "ofPort", hostOFPort, "interfaceName", hostIFName) + // Move configurations from the uplink to host port + if err = c.moveIFConfigurations(adapterConfig, uplinkName, hostIFName); err != nil { + return nil, err + } + klog.InfoS("Moved configurations to the host interface", "hostInterface", hostIFName) + if err = c.ofClient.InstallVMUplinkFlows(hostIFName, hostOFPort, uplinkOFPort); err != nil { + return nil, err + } + klog.InfoS("Added uplink and host port in OVS and installed openflow entries", "uplink", uplinkName, "hostInterface", hostIFName) + success = true + ifIPs := make([]net.IP, 0) + for _, ip := range ips { + ifIPs = append(ifIPs, net.ParseIP(ip)) + } + hostIFConfig := &interfacestore.InterfaceConfig{ + Type: interfacestore.ExternalEntityInterface, + InterfaceName: hostIFName, + IPs: ifIPs, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: hostIfUUID, + OFPort: hostOFPort, + }, + EntityInterfaceConfig: &interfacestore.EntityInterfaceConfig{ + EntityName: eeName, + EntityNamespace: eeNamespace, + UplinkPort: &interfacestore.OVSPortConfig{ + PortUUID: uplinkUUID, + OFPort: uplinkOFPort, + }, + }, + } + return hostIFConfig, nil +} + +func GetOVSAttachInfo(uplinkName, uplinkUUID, entityName, entityNamespace string, ips []string) map[string]interface{} { + attachInfo := map[string]interface{}{ + interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaHost, + } + if uplinkName != "" { + attachInfo[ovsExternalIDUplinkName] = uplinkName + } + if uplinkUUID != "" { + attachInfo[ovsExternalIDUplinkPort] = uplinkUUID + } + if entityName != "" { + attachInfo[ovsExternalIDEntityName] = entityName + } + if entityNamespace != "" { + attachInfo[ovsExternalIDEntityNamespace] = entityNamespace + } + if len(ips) != 0 { + attachInfo[ovsExternalIDIPs] = strings.Join(ips, ipsSplitter) + } + + return attachInfo +} + +func (c *ExternalNodeController) updateOVSPortsData(interfaceConfig *interfacestore.InterfaceConfig, portData *ovsconfig.OVSPortData, eeName string, ips []string) (*interfacestore.InterfaceConfig, error) { + attachInfo := map[string]interface{}{ + ovsExternalIDUplinkName: portData.ExternalIDs[ovsExternalIDUplinkName], + ovsExternalIDUplinkPort: portData.ExternalIDs[ovsExternalIDUplinkPort], + ovsExternalIDEntityName: eeName, + ovsExternalIDEntityNamespace: portData.ExternalIDs[ovsExternalIDEntityNamespace], + ovsExternalIDIPs: strings.Join(ips, ipsSplitter), + interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaHost, + } + err := c.ovsBridgeClient.SetPortExternalIDs(interfaceConfig.InterfaceName, attachInfo) + if err != nil { + return nil, err + } + ifIPs := make([]net.IP, 0) + for _, ip := range ips { + ifIPs = append(ifIPs, net.ParseIP(ip)) + } + iface := &interfacestore.InterfaceConfig{ + InterfaceName: interfaceConfig.InterfaceName, + Type: interfacestore.ExternalEntityInterface, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: interfaceConfig.PortUUID, + OFPort: interfaceConfig.OFPort, + }, + EntityInterfaceConfig: &interfacestore.EntityInterfaceConfig{ + EntityName: eeName, + EntityNamespace: interfaceConfig.EntityNamespace, + UplinkPort: &interfacestore.OVSPortConfig{ + PortUUID: interfaceConfig.UplinkPort.PortUUID, + OFPort: interfaceConfig.UplinkPort.OFPort, + }, + }, + IPs: ifIPs, + } + return iface, nil +} + +func (c *ExternalNodeController) removeOVSPortsAndFlows(interfaceConfig *interfacestore.InterfaceConfig) error { + portUUID := interfaceConfig.PortUUID + portName := interfaceConfig.InterfaceName + if err := c.ofClient.UninstallVMUplinkFlows(portName); err != nil { + return fmt.Errorf("failed to uninstall uplink and host port openflow entries, portName %s, err %v", portName, err) + } + klog.InfoS("Removed the flows installed to forward packet between uplinkPort and hostPort", "hostInterface", portName) + hostIFName := interfaceConfig.InterfaceName + uplinkIfName := util.GenerateUplinkInterfaceName(portName) + uplinkPortID := interfaceConfig.UplinkPort.PortUUID + iface, addrs, routes, err := util.GetInterfaceConfig(hostIFName) + if err != nil { + return err + } + adapterConfig := &config.AdapterNetConfig{ + Name: hostIFName, + Index: iface.Index, + MAC: iface.HardwareAddr, + IPs: addrs, + Routes: routes, + MTU: iface.MTU, + } + if ovsErr := c.ovsBridgeClient.DeletePort(portUUID); ovsErr != nil { + return fmt.Errorf("failed to delete host port %s, err %v", hostIFName, ovsErr) + } + klog.InfoS("Deleted host port in OVS", "hostInterface", hostIFName) + if ovsErr := c.ovsBridgeClient.DeletePort(uplinkPortID); ovsErr != nil { + return fmt.Errorf("failed to delete uplink port %s, err %v", uplinkIfName, ovsErr) + } + klog.InfoS("Deleted uplink port in OVS", "uplinkIfName", uplinkIfName) + defer func() { + // Delete host interface from OVS datapath if it exists. + // This is to resolve an issue that OVS fails to remove the interface from datapath. It might happen because the interface + // is busy when OVS tries to remove it with the OVSDB interface deletion event. + if err := c.ovsctlClient.DeleteDPInterface(hostIFName); err != nil { + klog.ErrorS(err, "Failed to delete host interface from OVS datapath", "interface", hostIFName) + } + }() + + // Wait until the host interface created by OVS is removed. + if err = wait.PollImmediate(50*time.Millisecond, 2*time.Second, func() (bool, error) { + return !util.HostInterfaceExists(hostIFName), nil + }); err != nil { + return fmt.Errorf("failed to wait for host interface %s deletion in 2s, err %v", hostIFName, err) + } + // Recover the uplink interface's name. + if err = util.RenameInterface(uplinkIfName, hostIFName); err != nil { + return err + } + klog.InfoS("Recovered uplink name to the host interface name", "uplinkIfName", uplinkIfName, "hostInterface", hostIFName) + // Move the IP configurations back to the host interface. + if err = c.moveIFConfigurations(adapterConfig, "", hostIFName); err != nil { + return err + } + klog.InfoS("Moved back configuration to the host interface", "hostInterface", hostIFName) + return nil +} + +func getHostInterfaceName(iface v1alpha1.NetworkInterface) (string, []string, error) { + ifName := "" + ips := sets.NewString() + for _, ipStr := range iface.IPs { + var ipFilter *ip.DualStackIPs + ifIP := net.ParseIP(ipStr) + if ifIP.To4() != nil { + ipFilter = &ip.DualStackIPs{IPv4: ifIP} + } else { + ipFilter = &ip.DualStackIPs{IPv6: ifIP} + } + _, _, link, err := util.GetIPNetDeviceFromIP(ipFilter, sets.NewString()) + if err == nil { + klog.InfoS("Using the interface", "linkName", link.Name, "IP", ipStr) + ips.Insert(ipStr) + if ifName == "" { + ifName = link.Name + } else if ifName != link.Name { + return "", ips.List(), fmt.Errorf("find different interfaces by IPs, ifName %s, linkName %s", ifName, link.Name) + } + } else { + klog.ErrorS(err, "Failed to get device from IP", "ip", ifIP) + } + } + if ifName == "" { + return "", ips.List(), fmt.Errorf("cannot find interface via IPs %v", iface.IPs) + } + return ifName, ips.List(), nil + +} + +func ParseHostInterfaceConfig(ovsBridgeClient ovsconfig.OVSBridgeClient, portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) (*interfacestore.InterfaceConfig, error) { + var interfaceConfig *interfacestore.InterfaceConfig + interfaceConfig = &interfacestore.InterfaceConfig{ + InterfaceName: portData.Name, + Type: interfacestore.ExternalEntityInterface, + OVSPortConfig: portConfig, + } + var hostUplinkConfig *interfacestore.EntityInterfaceConfig + entityIPArr := strings.Split(portData.ExternalIDs[ovsExternalIDIPs], ipsSplitter) + var entityIPs []net.IP + for _, ipStr := range entityIPArr { + entityIPs = append(entityIPs, net.ParseIP(ipStr)) + } + interfaceConfig.IPs = entityIPs + uplinkName, _ := portData.ExternalIDs[ovsExternalIDUplinkName] + uplinkPortUUID, _ := portData.ExternalIDs[ovsExternalIDUplinkPort] + uplinkPortData, ovsErr := ovsBridgeClient.GetPortData(uplinkPortUUID, uplinkName) + if ovsErr != nil { + return nil, ovsErr + } + entityName, _ := portData.ExternalIDs[ovsExternalIDEntityName] + entityNamespace, _ := portData.ExternalIDs[ovsExternalIDEntityNamespace] + hostUplinkConfig = &interfacestore.EntityInterfaceConfig{ + EntityName: entityName, + EntityNamespace: entityNamespace, + UplinkPort: &interfacestore.OVSPortConfig{ + PortUUID: uplinkPortUUID, + OFPort: uplinkPortData.OFPort, + }, + } + interfaceConfig.EntityInterfaceConfig = hostUplinkConfig + return interfaceConfig, nil +} + +func parseProtocol(protocol string) binding.Protocol { + var proto binding.Protocol + switch protocol { + case "tcp": + proto = binding.ProtocolTCP + case "udp": + proto = binding.ProtocolUDP + case "icmp": + proto = binding.ProtocolICMP + case "ip": + proto = binding.ProtocolIP + } + return proto +} diff --git a/pkg/agent/externalnode/external_node_controller_linux.go b/pkg/agent/externalnode/external_node_controller_linux.go new file mode 100644 index 00000000000..69758733dac --- /dev/null +++ b/pkg/agent/externalnode/external_node_controller_linux.go @@ -0,0 +1,59 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externalnode + +import ( + "fmt" + + "github.com/vishvananda/netlink" + + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/util" +) + +func (c *ExternalNodeController) moveIFConfigurations(adapterConfig *config.AdapterNetConfig, src string, dst string) error { + dstLink, err := netlink.LinkByName(dst) + if err != nil { + return fmt.Errorf("failed to find link for destination %s, err %v", dst, err) + } + if src != "" { + srcLink, err := netlink.LinkByName(src) + if err != nil { + return fmt.Errorf("failed to find link for source %s, err %v", src, err) + } + if err := netlink.LinkSetMTU(dstLink, adapterConfig.MTU); err != nil { + return err + } + if err := netlink.LinkSetUp(dstLink); err != nil { + return err + } + if err := util.RemoveLinkIPs(srcLink); err != nil { + return err + } + if err := util.RemoveLinkRoutes(srcLink); err != nil { + return err + } + } + dstIndex := dstLink.Attrs().Index + // Configure the source interface's IPs on the destination interface. + if err := util.ConfigureLinkAddresses(dstIndex, adapterConfig.IPs); err != nil { + return err + } + // Configure the source interface's routes on the destination interface. + if err := util.ConfigureLinkRoutes(dstLink, adapterConfig.Routes); err != nil { + return err + } + return nil +} diff --git a/pkg/agent/externalnode/external_node_controller_windows.go b/pkg/agent/externalnode/external_node_controller_windows.go new file mode 100644 index 00000000000..390133de5af --- /dev/null +++ b/pkg/agent/externalnode/external_node_controller_windows.go @@ -0,0 +1,21 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externalnode + +import "antrea.io/antrea/pkg/agent/config" + +func (c *ExternalNodeController) moveIFConfigurations(adapterConfig *config.AdapterNetConfig, src string, dst string) error { + return nil +} diff --git a/pkg/agent/interfacestore/types.go b/pkg/agent/interfacestore/types.go index b90ca452481..45f54003c9f 100644 --- a/pkg/agent/interfacestore/types.go +++ b/pkg/agent/interfacestore/types.go @@ -89,8 +89,6 @@ type EntityInterfaceConfig struct { EntityNamespace string // UplinkPort is the OVS port configuration for the uplink, which is a pair port of this interface on OVS. UplinkPort *OVSPortConfig - // HostIfaceIndex is the index of the host interface created by this OVS internal port. - HostIfaceIndex int } type InterfaceConfig struct { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 2eed43f7c61..ce44be7f967 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -337,7 +337,7 @@ type Client interface { // InstallPolicyBypassFlows installs flows to bypass the NetworkPolicy rules on the traffic with the given ipnet // or ip, port, protocol and direction. It is used to bypass NetworkPolicy enforcement on a VM for the particular // traffic. - InstallPolicyBypassFlows(protocol binding.Protocol, ipnet *net.IPNet, ip net.IP, port uint16, isIngress bool) error + InstallPolicyBypassFlows(protocol binding.Protocol, ipNet *net.IPNet, port uint16, isIngress bool) error } // GetFlowTableStatus returns an array of flow table status. diff --git a/pkg/agent/openflow/externalnode_connectivity.go b/pkg/agent/openflow/externalnode_connectivity.go index 98e40fcc439..26653edaade 100644 --- a/pkg/agent/openflow/externalnode_connectivity.go +++ b/pkg/agent/openflow/externalnode_connectivity.go @@ -152,7 +152,7 @@ func (f *featureExternalNodeConnectivity) replayFlows() []binding.Flow { return flows } -func (f *featureExternalNodeConnectivity) policyBypassFlow(protocol binding.Protocol, ipnet *net.IPNet, ip net.IP, port uint16, isIngress bool) binding.Flow { +func (f *featureExternalNodeConnectivity) policyBypassFlow(protocol binding.Protocol, ipNet *net.IPNet, port uint16, isIngress bool) binding.Flow { cookieID := f.cookieAllocator.Request(f.category).Raw() var flowBuilder binding.FlowBuilder var nextTable *Table @@ -161,26 +161,16 @@ func (f *featureExternalNodeConnectivity) policyBypassFlow(protocol binding.Prot Cookie(cookieID). MatchProtocol(protocol). MatchCTStateNew(true). - MatchCTStateTrk(true) - if ipnet != nil { - flowBuilder.MatchSrcIPNet(*ipnet) - } - if ip != nil { - flowBuilder.MatchSrcIP(ip) - } + MatchCTStateTrk(true). + MatchSrcIPNet(*ipNet) nextTable = IngressMetricTable } else { flowBuilder = EgressSecurityClassifierTable.ofTable.BuildFlow(priorityNormal). Cookie(cookieID). MatchProtocol(protocol). MatchCTStateNew(true). - MatchCTStateTrk(true) - if ipnet != nil { - flowBuilder.MatchDstIPNet(*ipnet) - } - if ip != nil { - flowBuilder.MatchDstIP(ip) - } + MatchCTStateTrk(true). + MatchDstIPNet(*ipNet) nextTable = EgressMetricTable } return flowBuilder.MatchDstPort(port, nil). @@ -210,8 +200,8 @@ func (c *client) UninstallVMUplinkFlows(hostIFName string) error { return c.deleteFlows(c.featureExternalNodeConnectivity.uplinkFlowCache, hostIFName) } -func (c *client) InstallPolicyBypassFlows(protocol binding.Protocol, ipnet *net.IPNet, ip net.IP, port uint16, isIngress bool) error { - flow := c.featureExternalNodeConnectivity.policyBypassFlow(protocol, ipnet, ip, port, isIngress) +func (c *client) InstallPolicyBypassFlows(protocol binding.Protocol, ipNet *net.IPNet, port uint16, isIngress bool) error { + flow := c.featureExternalNodeConnectivity.policyBypassFlow(protocol, ipNet, port, isIngress) if err := c.ofEntryOperations.Add(flow); err != nil { return err } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 3a91924563c..1e4f646f1c2 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -409,17 +409,17 @@ func (mr *MockClientMockRecorder) InstallPodSNATFlows(arg0, arg1, arg2 interface } // InstallPolicyBypassFlows mocks base method -func (m *MockClient) InstallPolicyBypassFlows(arg0 openflow.Protocol, arg1 *net.IPNet, arg2 net.IP, arg3 uint16, arg4 bool) error { +func (m *MockClient) InstallPolicyBypassFlows(arg0 openflow.Protocol, arg1 *net.IPNet, arg2 uint16, arg3 bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallPolicyBypassFlows", arg0, arg1, arg2, arg3, arg4) + ret := m.ctrl.Call(m, "InstallPolicyBypassFlows", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } // InstallPolicyBypassFlows indicates an expected call of InstallPolicyBypassFlows -func (mr *MockClientMockRecorder) InstallPolicyBypassFlows(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallPolicyBypassFlows(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallPolicyBypassFlows", reflect.TypeOf((*MockClient)(nil).InstallPolicyBypassFlows), arg0, arg1, arg2, arg3, arg4) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallPolicyBypassFlows", reflect.TypeOf((*MockClient)(nil).InstallPolicyBypassFlows), arg0, arg1, arg2, arg3) } // InstallPolicyRuleFlows mocks base method diff --git a/pkg/agent/util/net.go b/pkg/agent/util/net.go index 9d267283111..12f500d6202 100644 --- a/pkg/agent/util/net.go +++ b/pkg/agent/util/net.go @@ -415,3 +415,17 @@ func GenerateRandomMAC() net.HardwareAddr { buf[0] |= 2 return buf } + +func GetIPNetsByLink(link *net.Interface) ([]*net.IPNet, error) { + addrList, err := link.Addrs() + if err != nil { + return nil, err + } + var addrs []*net.IPNet + for _, a := range addrList { + if ipNet, ok := a.(*net.IPNet); ok { + addrs = append(addrs, ipNet) + } + } + return addrs, nil +} diff --git a/pkg/agent/util/net_linux.go b/pkg/agent/util/net_linux.go index 4ac5220e7e6..94dc5ebe988 100644 --- a/pkg/agent/util/net_linux.go +++ b/pkg/agent/util/net_linux.go @@ -23,6 +23,7 @@ import ( "os" "os/exec" "path/filepath" + "strings" "time" "github.com/containernetworking/plugins/pkg/ip" @@ -193,7 +194,7 @@ func ConfigureLinkAddresses(idx int, ipNets []*net.IPNet) error { for _, addr := range addrsToAdd { klog.V(2).Infof("Adding address %v to interface %s", addr, ifaceName) - if err := netlink.AddrAdd(link, addr); err != nil { + if err := netlink.AddrAdd(link, addr); err != nil && !strings.Contains(err.Error(), "file exists") { return fmt.Errorf("failed to add address %v to interface %s: %v", addr, ifaceName, err) } } @@ -240,6 +241,34 @@ func DeleteOVSPort(brName, portName string) error { return cmd.Run() } +func HostInterfaceExists(ifName string) bool { + _, err := netlink.LinkByName(ifName) + if err == nil { + return true + } + if _, ok := err.(netlink.LinkNotFoundError); ok { + return false + } + klog.ErrorS(err, "Failed to find host interface", "name", ifName) + return false +} + +func GetInterfaceConfig(ifName string) (*net.Interface, []*net.IPNet, []interface{}, error) { + iface, err := net.InterfaceByName(ifName) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to get interface by name %s, err %v", ifName, err) + } + addrs, err := GetIPNetsByLink(iface) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to get address for interface %s, err %v", ifName, err) + } + routes, err := getRoutesOnInterface(iface.Index) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to get routes for iface.Index %d, err %v", iface.Index, err) + } + return iface, addrs, routes, nil +} + func RenameInterface(from, to string) error { klog.InfoS("Renaming interface", "oldName", from, "newName", to) var renameErr error @@ -257,6 +286,59 @@ func RenameInterface(from, to string) error { return nil } +func RemoveLinkIPs(link netlink.Link) error { + addrs, err := netlink.AddrList(link, netlink.FAMILY_ALL) + if err != nil { + return err + } + for i := range addrs { + if err = netlink.AddrDel(link, &addrs[i]); err != nil { + return err + } + } + return nil +} + +func RemoveLinkRoutes(link netlink.Link) error { + routes, err := netlink.RouteList(link, netlink.FAMILY_ALL) + if err != nil { + return err + } + for i := range routes { + if err = netlink.RouteDel(&routes[i]); err != nil { + return err + } + } + return nil +} + +func ConfigureLinkRoutes(link netlink.Link, routes []interface{}) error { + for _, r := range routes { + rt := r.(netlink.Route) + rt.LinkIndex = link.Attrs().Index + if err := netlink.RouteReplace(&rt); err != nil { + return err + } + } + return nil +} + +func getRoutesOnInterface(linkIndex int) ([]interface{}, error) { + link, err := netlink.LinkByIndex(linkIndex) + if err != nil { + return nil, err + } + rs, err := netlink.RouteList(link, netlink.FAMILY_ALL) + if err != nil { + return nil, err + } + var routes []interface{} + for _, r := range rs { + routes = append(routes, r) + } + return routes, nil +} + func renameHostInterface(oriName string, newName string) error { link, err := netlink.LinkByName(oriName) if err != nil { diff --git a/pkg/agent/util/net_windows.go b/pkg/agent/util/net_windows.go index ac5abc8c7a5..fe3476a1be9 100644 --- a/pkg/agent/util/net_windows.go +++ b/pkg/agent/util/net_windows.go @@ -895,3 +895,13 @@ func ReplaceNetNeighbor(neighbor *Neighbor) error { func VirtualAdapterName(name string) string { return fmt.Sprintf("%s (%s)", ContainerVNICPrefix, name) } + +// TODO: Implement GetInterfaceConfig for Windows +func GetInterfaceConfig(ifName string) (*net.Interface, []*net.IPNet, []interface{}, error) { + return nil, nil, nil, nil +} + +// TODO: Implement RenameInterface for Windows +func RenameInterface(from, to string) error { + return nil +} diff --git a/pkg/config/agent/config.go b/pkg/config/agent/config.go index 321e6a95ef8..afaebdec506 100644 --- a/pkg/config/agent/config.go +++ b/pkg/config/agent/config.go @@ -204,6 +204,8 @@ type AgentConfig struct { // NodeType is type of the Node where Antrea Agent is running. // Defaults to "k8sNode". Valid values include "k8sNode", and "externalNode". NodeType string `yaml:"nodeType,omitempty"` + // ExternalNode related configurations. + ExternalNode ExternalNodeConfig `yaml:"externalNode,omitempty"` } type AntreaProxyConfig struct { @@ -274,3 +276,26 @@ type MulticlusterConfig struct { // The default is antrea-agent's Namespace. Namespace string `yaml:"namespace,omitempty"` } + +type ExternalNodeConfig struct { + // The expected Namespace in which the ExternalNode should be created for a VM or baremetal server Node. + // The default value is "default". + // It is used only when NodeType is externalNode. + ExternalNodeNamespace string `yaml:"externalNodeNamespace,omitempty"` + // The policy bypass rules define traffic that should bypass NetworkPolicy rules. + // Each rule contains the following four attributes: + // direction (ingress|egress), protocol(tcp/udp/icmp/ip), remote CIDR, dst port (ICMP doesn't require), + // It is used only when NodeType is externalNode. + PolicyBypassRules []PolicyBypassRule `yaml:"policyBypassRules,omitempty"` +} + +type PolicyBypassRule struct { + // The direction value can be ingress or egress. + Direction string `yaml:"direction,omitempty"` + // The protocol which traffic must match. Supported values are TCP, UDP, ICMP and IP. + Protocol string `yaml:"protocol,omitempty"` + // CIDR marks the destination CIDR for Egress and source CIDR for Ingress. + CIDR string `json:"cidr,omitempty"` + // The destination port of the given protocol. + Port int `yaml:"port,omitempty"` +} diff --git a/pkg/controller/networkpolicy/networkpolicy_controller.go b/pkg/controller/networkpolicy/networkpolicy_controller.go index 418622ef765..b9ed508a5ca 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller.go @@ -1145,7 +1145,7 @@ func (n *NetworkPolicyController) getMemberSetForGroupType(groupType grouping.Gr groupMemberSet.Insert(podToGroupMember(pod, true)) } for _, ee := range externalEntities { - groupMemberSet.Insert(externalEntityToGroupMember(ee)) + groupMemberSet.Insert(externalEntityToGroupMember(ee, true)) } return groupMemberSet } @@ -1194,10 +1194,9 @@ func nodeToGroupMember(node *v1.Node) (member *controlplane.GroupMember) { return } -func externalEntityToGroupMember(ee *v1alpha2.ExternalEntity) *controlplane.GroupMember { +func externalEntityToGroupMember(ee *v1alpha2.ExternalEntity, includeIP bool) *controlplane.GroupMember { memberEntity := &controlplane.GroupMember{} namedPorts := make([]controlplane.NamedPort, len(ee.Spec.Ports)) - var ips []controlplane.IPAddress for i, port := range ee.Spec.Ports { namedPorts[i] = controlplane.NamedPort{ Port: port.Port, @@ -1205,8 +1204,10 @@ func externalEntityToGroupMember(ee *v1alpha2.ExternalEntity) *controlplane.Grou Protocol: controlplane.Protocol(port.Protocol), } } - for _, ep := range ee.Spec.Endpoints { - ips = append(ips, ipStrToIPAddress(ep.IP)) + if includeIP { + for _, ep := range ee.Spec.Endpoints { + memberEntity.IPs = append(memberEntity.IPs, ipStrToIPAddress(ep.IP)) + } } eeRef := controlplane.ExternalEntityReference{ Name: ee.Name, @@ -1214,7 +1215,6 @@ func externalEntityToGroupMember(ee *v1alpha2.ExternalEntity) *controlplane.Grou } memberEntity.ExternalEntity = &eeRef memberEntity.Ports = namedPorts - memberEntity.IPs = ips return memberEntity } @@ -1266,7 +1266,7 @@ func (n *NetworkPolicyController) syncAppliedToGroup(key string) error { if entitySet == nil { entitySet = controlplane.GroupMemberSet{} } - entitySet.Insert(externalEntityToGroupMember(extEntity)) + entitySet.Insert(externalEntityToGroupMember(extEntity, false)) memberSetByNode[extEntity.Spec.ExternalNode] = entitySet appGroupNodeNames.Insert(extEntity.Spec.ExternalNode) } diff --git a/pkg/ovs/ovsctl/appctl.go b/pkg/ovs/ovsctl/appctl.go index cf1e36eb681..5ac563e49b5 100644 --- a/pkg/ovs/ovsctl/appctl.go +++ b/pkg/ovs/ovsctl/appctl.go @@ -19,6 +19,7 @@ import ( "bytes" "fmt" "net" + "strconv" "strings" "k8s.io/klog/v2" @@ -227,3 +228,38 @@ func (c *ovsCtlClient) GetDPFeatures() (map[DPFeature]bool, error) { } return features, nil } + +// DeleteDPInterface deletes OVS datapath interface, and it returns with no error if the interface does not exist. +func (c *ovsCtlClient) DeleteDPInterface(name string) error { + cmd := fmt.Sprintf("dpctl/show ovs-system") + out, execErr := c.runAppCtl(cmd, false) + if execErr != nil { + return execErr + } + scanner := bufio.NewScanner(strings.NewReader(string(out))) + scanner.Split(bufio.ScanLines) + for scanner.Scan() { + line := scanner.Text() + fields := strings.Split(line, ": ") + if len(fields) < 2 { + continue + } + nameStr := fields[1] + ifName := strings.Split(nameStr, " (internal)")[0] + if ifName == name { + portStr := strings.Split(fields[0], " ")[1] + port, err := strconv.Atoi(portStr) + if err != nil { + return fmt.Errorf("failed to parse portNum from portStr %s, line %s", portStr, line) + } + cmd = fmt.Sprintf("dpctl/del-if ovs-system %d", port) + _, execErr = c.runAppCtl(cmd, false) + if execErr == nil || strings.Contains(execErr.Error(), "No such device") { + return nil + } else { + return execErr + } + } + } + return nil +} diff --git a/pkg/ovs/ovsctl/interface.go b/pkg/ovs/ovsctl/interface.go index b6c74088621..c925e7c5db4 100644 --- a/pkg/ovs/ovsctl/interface.go +++ b/pkg/ovs/ovsctl/interface.go @@ -60,6 +60,8 @@ type OVSCtlClient interface { RunAppctlCmd(cmd string, needsBridge bool, args ...string) ([]byte, *ExecError) // GetDPFeatures executes "ovs-appctl dpif/show-dp-features" to check supported DP features. GetDPFeatures() (map[DPFeature]bool, error) + // DeleteDPInterface executes "ovs-appctl dpctl/del-if ovs-system $name" to delete OVS datapath interface. + DeleteDPInterface(name string) error } type BadRequestError string diff --git a/pkg/ovs/ovsctl/testing/mock_ovsctl.go b/pkg/ovs/ovsctl/testing/mock_ovsctl.go index 05e0663d8a9..fc8b37f2c35 100644 --- a/pkg/ovs/ovsctl/testing/mock_ovsctl.go +++ b/pkg/ovs/ovsctl/testing/mock_ovsctl.go @@ -1,4 +1,4 @@ -// Copyright 2021 Antrea Authors +// Copyright 2022 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -48,6 +48,20 @@ func (m *MockOVSCtlClient) EXPECT() *MockOVSCtlClientMockRecorder { return m.recorder } +// DeleteDPInterface mocks base method +func (m *MockOVSCtlClient) DeleteDPInterface(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteDPInterface", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteDPInterface indicates an expected call of DeleteDPInterface +func (mr *MockOVSCtlClientMockRecorder) DeleteDPInterface(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteDPInterface", reflect.TypeOf((*MockOVSCtlClient)(nil).DeleteDPInterface), arg0) +} + // DumpFlows mocks base method func (m *MockOVSCtlClient) DumpFlows(arg0 ...string) ([]string, error) { m.ctrl.T.Helper() diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index ca8b25a9c9d..e81c2d8739b 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -196,9 +196,11 @@ func TestAntreaFlexibleIPAMConnectivityFlows(t *testing.T) { Name: "fake-uplink", Index: 0, MAC: uplinkMAC, - IP: &net.IPNet{ - IP: nil, - Mask: nil, + IPs: []*net.IPNet{ + { + IP: nil, + Mask: nil, + }, }, Gateway: "", DNSServers: "",