diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index 6b3d5f198cd..66247216af2 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -12,10 +12,14 @@ featureGates: {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "AntreaProxy" "default" true) }} # Enable EndpointSlice support in AntreaProxy. Don't enable this feature unless that EndpointSlice -# API version v1beta1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, +# API version v1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, # this flag will not take effect. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "EndpointSlice" "default" false) }} +# Enable TopologyAwareHints in AntreaProxy. This requires AntreaProxy and EndpointSlice to be +# enabled, otherwise this flag will not take effect. +{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "TopologyAwareHints" "default" false) }} + # Enable traceflow which provides packet tracing feature to diagnose network issue. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "Traceflow" "default" true) }} diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index bb41cc55802..9488aecd104 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -2549,10 +2549,14 @@ data: # AntreaProxy: true # Enable EndpointSlice support in AntreaProxy. Don't enable this feature unless that EndpointSlice - # API version v1beta1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, + # API version v1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, # this flag will not take effect. # EndpointSlice: false + # Enable TopologyAwareHints in AntreaProxy. This requires AntreaProxy and EndpointSlice to be + # enabled, otherwise this flag will not take effect. + # TopologyAwareHints: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -3688,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 890f1364c9b89811375830c94fab2fa9f1957518351cc52c623e22b6964e5e75 + checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 labels: app: antrea component: antrea-agent @@ -3928,7 +3932,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 890f1364c9b89811375830c94fab2fa9f1957518351cc52c623e22b6964e5e75 + checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 45eb18a7d01..25e27b37618 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -2549,10 +2549,14 @@ data: # AntreaProxy: true # Enable EndpointSlice support in AntreaProxy. Don't enable this feature unless that EndpointSlice - # API version v1beta1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, + # API version v1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, # this flag will not take effect. # EndpointSlice: false + # Enable TopologyAwareHints in AntreaProxy. This requires AntreaProxy and EndpointSlice to be + # enabled, otherwise this flag will not take effect. + # TopologyAwareHints: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -3688,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 890f1364c9b89811375830c94fab2fa9f1957518351cc52c623e22b6964e5e75 + checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 labels: app: antrea component: antrea-agent @@ -3930,7 +3934,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 890f1364c9b89811375830c94fab2fa9f1957518351cc52c623e22b6964e5e75 + checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 75f23b9a46e..3a5dbc75ac4 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -2549,10 +2549,14 @@ data: # AntreaProxy: true # Enable EndpointSlice support in AntreaProxy. Don't enable this feature unless that EndpointSlice - # API version v1beta1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, + # API version v1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, # this flag will not take effect. # EndpointSlice: false + # Enable TopologyAwareHints in AntreaProxy. This requires AntreaProxy and EndpointSlice to be + # enabled, otherwise this flag will not take effect. + # TopologyAwareHints: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -3688,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: cc8af4219d403a137ab87500ae0ab15b681fc635e41057b5623df6154443fddf + checksum/config: c74fa3f40177249ad901af12a4127b31b3291f9b8bf3ce6a9be1e666e29c5447 labels: app: antrea component: antrea-agent @@ -3928,7 +3932,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: cc8af4219d403a137ab87500ae0ab15b681fc635e41057b5623df6154443fddf + checksum/config: c74fa3f40177249ad901af12a4127b31b3291f9b8bf3ce6a9be1e666e29c5447 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 785e8f135d9..06a718aec3c 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -2562,10 +2562,14 @@ data: # AntreaProxy: true # Enable EndpointSlice support in AntreaProxy. Don't enable this feature unless that EndpointSlice - # API version v1beta1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, + # API version v1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, # this flag will not take effect. # EndpointSlice: false + # Enable TopologyAwareHints in AntreaProxy. This requires AntreaProxy and EndpointSlice to be + # enabled, otherwise this flag will not take effect. + # TopologyAwareHints: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -3701,7 +3705,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: df5271a5c42a550d3f8e73fbe8e5fad8d178884fb74d81c7322128187546db86 + checksum/config: 1609abc57e2865390df7a7d99e4c3b342c7e097fa879fefe8e4315130eaa9019 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -3987,7 +3991,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: df5271a5c42a550d3f8e73fbe8e5fad8d178884fb74d81c7322128187546db86 + checksum/config: 1609abc57e2865390df7a7d99e4c3b342c7e097fa879fefe8e4315130eaa9019 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 4fcbc4001f7..69e328f1dd2 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -2549,10 +2549,14 @@ data: # AntreaProxy: true # Enable EndpointSlice support in AntreaProxy. Don't enable this feature unless that EndpointSlice - # API version v1beta1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, + # API version v1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, # this flag will not take effect. # EndpointSlice: false + # Enable TopologyAwareHints in AntreaProxy. This requires AntreaProxy and EndpointSlice to be + # enabled, otherwise this flag will not take effect. + # TopologyAwareHints: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -3688,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 033b7f8c7b77a918ad1a90c3db034bbfc1df67de264a77f8aee6a035836b6812 + checksum/config: 0814cc9f3baa94e76e83a108b04d05200485610c7f5950c584503af7151a9e86 labels: app: antrea component: antrea-agent @@ -3928,7 +3932,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 033b7f8c7b77a918ad1a90c3db034bbfc1df67de264a77f8aee6a035836b6812 + checksum/config: 0814cc9f3baa94e76e83a108b04d05200485610c7f5950c584503af7151a9e86 labels: app: antrea component: antrea-controller diff --git a/docs/feature-gates.md b/docs/feature-gates.md index e9678bbd884..88299c2c2bf 100644 --- a/docs/feature-gates.md +++ b/docs/feature-gates.md @@ -34,9 +34,10 @@ example, to enable `AntreaProxy` on Linux, edit the Agent configuration in the ## List of Available Features | Feature Name | Component | Default | Stage | Alpha Release | Beta Release | GA Release | Extra Requirements | Notes | -| ----------------------- | ------------------ | ------- | ----- | ------------- | ------------ | ---------- | ------------------ | ----- | +| ----------------------- | ------------------ | ------- | ----- |---------------| ------------ | ---------- | ------------------ | ----- | | `AntreaProxy` | Agent | `true` | Beta | v0.8 | v0.11 | N/A | Yes | Must be enabled for Windows. | | `EndpointSlice` | Agent | `false` | Alpha | v0.13.0 | N/A | N/A | Yes | | +| `TopologyAwareHints` | Agent | `false` | Alpha | v1.8 | N/A | N/A | Yes | | | `AntreaPolicy` | Agent + Controller | `true` | Beta | v0.8 | v1.0 | N/A | No | Agent side config required from v0.9.0+. | | `Traceflow` | Agent + Controller | `true` | Beta | v0.8 | v0.11 | N/A | Yes | | | `FlowExporter` | Agent | `false` | Alpha | v0.9 | N/A | N/A | Yes | | @@ -90,6 +91,19 @@ and will not implement Cluster IP functionality as expected. When using the OVS built-in kernel module (which is the most common case), your kernel version must be >= 4.6 (as opposed to >= 4.4 without this feature). +### TopologyAwareHints + +`TopologyAwareHints` enables TopologyAwareHints support in AntreaProxy. The feature +TopologyAwareHints is at beta stage in Kubernetes 1.23 (beta), and it is enabled by +default in Kubernetes 1.24. For AntreaProxy, traffic can be routed to the Endpoint +which is closer to its origin with this feature. Refer to this +[link](https://kubernetes.io/docs/concepts/services-networking/topology-aware-hints/) +for more information. + +#### Requirements for this Feature + +Feature EndpointSlice is enabled. + ### AntreaPolicy `AntreaPolicy` enables Antrea ClusterNetworkPolicy and Antrea NetworkPolicy CRDs to be diff --git a/pkg/agent/proxy/endpoints.go b/pkg/agent/proxy/endpoints.go index 1ce38df9360..152ac6b0a3b 100644 --- a/pkg/agent/proxy/endpoints.go +++ b/pkg/agent/proxy/endpoints.go @@ -21,7 +21,7 @@ import ( "sync" corev1 "k8s.io/api/core/v1" - discovery "k8s.io/api/discovery/v1beta1" + discovery "k8s.io/api/discovery/v1" apimachinerytypes "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" diff --git a/pkg/agent/proxy/endpointslicecache.go b/pkg/agent/proxy/endpointslicecache.go index 296babf1ec7..af418e6b430 100644 --- a/pkg/agent/proxy/endpointslicecache.go +++ b/pkg/agent/proxy/endpointslicecache.go @@ -44,13 +44,14 @@ import ( "strings" "sync" - v1 "k8s.io/api/core/v1" - discovery "k8s.io/api/discovery/v1beta1" + discovery "k8s.io/api/discovery/v1" apimachinerytypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" "antrea.io/antrea/pkg/agent/proxy/types" + "antrea.io/antrea/pkg/features" "antrea.io/antrea/third_party/proxy" ) @@ -93,10 +94,16 @@ type endpointSliceInfo struct { // endpointInfo contains just the attributes kube-proxy cares about. // Used for caching. Intentionally small to limit memory util. -// Addresses and Topology are copied from EndpointSlice Endpoints. +// Addresses, NodeName, and Zone are copied from EndpointSlice Endpoints. type endpointInfo struct { Addresses []string - Topology map[string]string + NodeName *string + Zone *string + ZoneHints sets.String + + Ready bool + Serving bool + Terminating bool } // spToEndpointMap stores groups Endpoint objects by ServicePortName and @@ -134,12 +141,27 @@ func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) * if !remove { for _, endpoint := range endpointSlice.Endpoints { - if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready { - esInfo.Endpoints = append(esInfo.Endpoints, &endpointInfo{ - Addresses: endpoint.Addresses, - Topology: endpoint.Topology, - }) + epInfo := &endpointInfo{ + Addresses: endpoint.Addresses, + Zone: endpoint.Zone, + NodeName: endpoint.NodeName, + + // conditions + Ready: endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready, + Serving: endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving, + Terminating: endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating, + } + + if features.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { + if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 { + epInfo.ZoneHints = sets.String{} + for _, zone := range endpoint.Hints.ForZones { + epInfo.ZoneHints.Insert(zone.Name) + } + } } + + esInfo.Endpoints = append(esInfo.Endpoints, epInfo) } sort.Sort(byAddress(esInfo.Endpoints)) @@ -152,7 +174,7 @@ func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) * func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.EndpointSlice, remove bool) bool { serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice) if err != nil { - klog.Warningf("Error getting endpoint slice cache keys: %v", err) + klog.ErrorS(err, "Error getting endpoint slice cache keys") return false } @@ -236,15 +258,15 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN apimachiner Protocol: *port.Protocol, } - endpointInfoBySP[svcPortName] = cache.addEndpointsByIP(serviceNN, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints) + endpointInfoBySP[svcPortName] = cache.addEndpoints(serviceNN, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints) } } return endpointInfoBySP } -// addEndpointsByIP adds endpointInfo for each IP. -func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN apimachinerytypes.NamespacedName, portNum int, endpointsByIP map[string]proxy.Endpoint, endpoints []*endpointInfo) map[string]proxy.Endpoint { +// addEndpoints adds endpointInfo for each IP. +func (cache *EndpointSliceCache) addEndpoints(serviceNN apimachinerytypes.NamespacedName, portNum int, endpointsByIP map[string]proxy.Endpoint, endpoints []*endpointInfo) map[string]proxy.Endpoint { if endpointsByIP == nil { endpointsByIP = map[string]proxy.Endpoint{} } @@ -252,7 +274,7 @@ func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN apimachinerytypes.Na // iterate through endpoints to add them to endpointsByIP. for _, endpoint := range endpoints { if len(endpoint.Addresses) == 0 { - klog.Warningf("Ignoring invalid endpoint port %s with empty addresses", endpoint) + klog.ErrorS(nil, "Ignoring invalid endpoint port with empty address", "endpoint", endpoint) continue } @@ -261,10 +283,20 @@ func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN apimachinerytypes.Na if cache.isIPv6Mode && utilnet.IsIPv6String(endpoint.Addresses[0]) != cache.isIPv6Mode { continue } + isLocal := false + nodeName := "" + if endpoint.NodeName != nil { + isLocal = cache.isLocal(*endpoint.NodeName) + nodeName = *endpoint.NodeName + } - isLocal := cache.isLocal(endpoint.Topology[v1.LabelHostname]) - endpointInfo := proxy.NewBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpoint.Topology) + zone := "" + if endpoint.Zone != nil { + zone = *endpoint.Zone + } + endpointInfo := proxy.NewBaseEndpointInfo(endpoint.Addresses[0], nodeName, zone, portNum, isLocal, + endpoint.Ready, endpoint.Serving, endpoint.Terminating, endpoint.ZoneHints) // This logic ensures we're deduping potential overlapping endpoints // isLocal should not vary between matching IPs, but if it does, we // favor a true value here if it exists. diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 32c3ab3b57a..2c22baeb7b7 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -18,13 +18,14 @@ import ( "fmt" "math" "net" + "reflect" "sort" "strings" "sync" "time" corev1 "k8s.io/api/core/v1" - "k8s.io/api/discovery/v1beta1" + discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime" k8sapitypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -76,12 +77,14 @@ type proxier struct { endpointSliceConfig *config.EndpointSliceConfig endpointsConfig *config.EndpointsConfig serviceConfig *config.ServiceConfig + nodeConfig *config.NodeConfig // endpointsChanges and serviceChanges contains all changes to endpoints and // services that happened since last syncProxyRules call. For a single object, // changes are accumulated. Once both endpointsChanges and serviceChanges // have been synced, syncProxyRules will start syncing rules to OVS. endpointsChanges *endpointsChangesTracker serviceChanges *serviceChangesTracker + nodeLabels map[string]string // serviceMap stores services we expect to be installed. serviceMap k8sproxy.ServiceMap // serviceInstalledMap stores services we actually installed. @@ -91,7 +94,7 @@ type proxier struct { // endpointsInstalledMap stores endpoints we actually installed. endpointsInstalledMap types.EndpointsMap // serviceEndpointsMapsMutex protects serviceMap, serviceInstalledMap, - // endpointsMap, and endpointsInstalledMap, which can be read by + // endpointsMap, nodeLabels, and endpointsInstalledMap, which can be read by // GetServiceFlowKeys() called by the "/ovsflows" API handler. serviceEndpointsMapsMutex sync.Mutex // endpointReferenceCounter stores the number of times an Endpoint is referenced by Services. @@ -109,16 +112,18 @@ type proxier struct { syncedOnce bool syncedOnceMutex sync.RWMutex - runner *k8sproxy.BoundedFrequencyRunner - stopChan <-chan struct{} - ofClient openflow.Client - routeClient route.Interface - nodePortAddresses []net.IP - hostGateWay string - isIPv6 bool - proxyAll bool - endpointSliceEnabled bool - proxyLoadBalancerIPs bool + runner *k8sproxy.BoundedFrequencyRunner + stopChan <-chan struct{} + ofClient openflow.Client + routeClient route.Interface + nodePortAddresses []net.IP + hostGateWay string + hostname string + isIPv6 bool + proxyAll bool + endpointSliceEnabled bool + proxyLoadBalancerIPs bool + topologyAwareHintsEnabled bool } func (p *proxier) SyncedOnce() bool { @@ -357,6 +362,9 @@ func (p *proxier) installServices() { p.endpointsInstalledMap[svcPortName] = endpointsInstalled } endpoints := p.endpointsMap[svcPortName] + if p.topologyAwareHintsEnabled { + endpoints = filterEndpoints(endpoints, svcInfo, p.nodeLabels) + } // If both expected Endpoints number and installed Endpoints number are 0, we don't need to take care of this Service. if len(endpoints) == 0 && len(endpointsInstalled) == 0 { continue @@ -744,19 +752,19 @@ func (p *proxier) OnEndpointsSynced() { } } -func (p *proxier) OnEndpointSliceAdd(endpointSlice *v1beta1.EndpointSlice) { +func (p *proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) { if p.endpointsChanges.OnEndpointSliceUpdate(endpointSlice, false) && p.isInitialized() { p.runner.Run() } } -func (p *proxier) OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *v1beta1.EndpointSlice) { +func (p *proxier) OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice) { if p.endpointsChanges.OnEndpointSliceUpdate(newEndpointSlice, false) && p.isInitialized() { p.runner.Run() } } -func (p *proxier) OnEndpointSliceDelete(endpointSlice *v1beta1.EndpointSlice) { +func (p *proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) { if p.endpointsChanges.OnEndpointSliceUpdate(endpointSlice, true) && p.isInitialized() { p.runner.Run() } @@ -797,6 +805,68 @@ func (p *proxier) OnServiceSynced() { } } +// OnNodeAdd is called whenever creation of new node object +// is observed. +func (p *proxier) OnNodeAdd(node *corev1.Node) { + if node.Name != p.hostname { + return + } + + if reflect.DeepEqual(p.nodeLabels, node.Labels) { + return + } + + p.serviceEndpointsMapsMutex.Lock() + p.nodeLabels = map[string]string{} + for k, v := range node.Labels { + p.nodeLabels[k] = v + } + p.serviceEndpointsMapsMutex.Unlock() + klog.V(4).InfoS("Updated proxier Node labels", "labels", node.Labels) + + p.syncProxyRules() +} + +// OnNodeUpdate is called whenever modification of an existing +// node object is observed. +func (p *proxier) OnNodeUpdate(oldNode, node *corev1.Node) { + if node.Name != p.hostname { + return + } + + if reflect.DeepEqual(p.nodeLabels, node.Labels) { + return + } + + p.serviceEndpointsMapsMutex.Lock() + p.nodeLabels = map[string]string{} + for k, v := range node.Labels { + p.nodeLabels[k] = v + } + p.serviceEndpointsMapsMutex.Unlock() + klog.V(4).InfoS("Updated proxier Node labels", "labels", node.Labels) + + p.syncProxyRules() +} + +// OnNodeDelete is called whenever deletion of an existing node +// object is observed. +func (p *proxier) OnNodeDelete(node *corev1.Node) { + if node.Name != p.hostname { + return + } + p.serviceEndpointsMapsMutex.Lock() + p.nodeLabels = nil + p.serviceEndpointsMapsMutex.Unlock() + + p.syncProxyRules() +} + +// OnNodeSynced is called once all the initial event handlers were +// called and the state is fully propagated to local cache. +func (p *proxier) OnNodeSynced() { +} + func (p *proxier) GetServiceByIP(serviceStr string) (k8sproxy.ServicePortName, bool) { p.serviceStringMapMutex.Lock() defer p.serviceStringMapMutex.Unlock() @@ -900,39 +970,47 @@ func NewProxier( klog.V(2).Infof("Creating proxier with IPv6 enabled=%t", isIPv6) endpointSliceEnabled := features.DefaultFeatureGate.Enabled(features.EndpointSlice) + topologyAwareHintsEnabled := features.DefaultFeatureGate.Enabled(features.TopologyAwareHints) ipFamily := corev1.IPv4Protocol if isIPv6 { ipFamily = corev1.IPv6Protocol } p := &proxier{ - endpointsConfig: config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod), - serviceConfig: config.NewServiceConfig(informerFactory.Core().V1().Services(), resyncPeriod), - endpointsChanges: newEndpointsChangesTracker(hostname, endpointSliceEnabled, isIPv6), - serviceChanges: newServiceChangesTracker(recorder, ipFamily, skipServices), - serviceMap: k8sproxy.ServiceMap{}, - serviceInstalledMap: k8sproxy.ServiceMap{}, - endpointsInstalledMap: types.EndpointsMap{}, - endpointsMap: types.EndpointsMap{}, - endpointReferenceCounter: map[string]int{}, - serviceStringMap: map[string]k8sproxy.ServicePortName{}, - oversizeServiceSet: sets.NewString(), - groupCounter: groupCounter, - ofClient: ofClient, - routeClient: routeClient, - nodePortAddresses: nodePortAddresses, - isIPv6: isIPv6, - proxyAll: proxyAllEnabled, - endpointSliceEnabled: endpointSliceEnabled, - proxyLoadBalancerIPs: proxyLoadBalancerIPs, + endpointsConfig: config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod), + serviceConfig: config.NewServiceConfig(informerFactory.Core().V1().Services(), resyncPeriod), + endpointsChanges: newEndpointsChangesTracker(hostname, endpointSliceEnabled, isIPv6), + serviceChanges: newServiceChangesTracker(recorder, ipFamily, skipServices), + serviceMap: k8sproxy.ServiceMap{}, + serviceInstalledMap: k8sproxy.ServiceMap{}, + endpointsInstalledMap: types.EndpointsMap{}, + endpointsMap: types.EndpointsMap{}, + endpointReferenceCounter: map[string]int{}, + nodeLabels: map[string]string{}, + serviceStringMap: map[string]k8sproxy.ServicePortName{}, + oversizeServiceSet: sets.NewString(), + groupCounter: groupCounter, + ofClient: ofClient, + routeClient: routeClient, + nodePortAddresses: nodePortAddresses, + isIPv6: isIPv6, + proxyAll: proxyAllEnabled, + endpointSliceEnabled: endpointSliceEnabled, + topologyAwareHintsEnabled: topologyAwareHintsEnabled, + proxyLoadBalancerIPs: proxyLoadBalancerIPs, + hostname: hostname, } p.serviceConfig.RegisterEventHandler(p) p.endpointsConfig.RegisterEventHandler(p) p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, time.Second, 30*time.Second, 2) if endpointSliceEnabled { - p.endpointSliceConfig = config.NewEndpointSliceConfig(informerFactory.Discovery().V1beta1().EndpointSlices(), resyncPeriod) + p.endpointSliceConfig = config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), resyncPeriod) p.endpointSliceConfig.RegisterEventHandler(p) + if p.topologyAwareHintsEnabled { + p.nodeConfig = config.NewNodeConfig(informerFactory.Core().V1().Nodes(), resyncPeriod) + p.nodeConfig.RegisterEventHandler(p) + } } else { p.endpointsConfig = config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod) p.endpointsConfig.RegisterEventHandler(p) diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 047252f3884..78f17b93fd5 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -215,10 +215,10 @@ func testClusterIP(t *testing.T, svcIP net.IP, ep1IP, ep2IP net.IP, isIPv6, node allEps := append(extraEps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)) makeEndpointsMap(fp, allEps...) - expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), svcPort, true, nil)} + expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)} expectedAllEps := expectedLocalEps if !nodeLocalInternal { - expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), svcPort, false, nil)) + expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil)) } bindingProtocol := binding.ProtocolTCP @@ -307,10 +307,10 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep eps := []*corev1.Endpoints{makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)} makeEndpointsMap(fp, eps...) - expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), svcPort, true, nil)} + expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)} expectedAllEps := expectedLocalEps if !(nodeLocalInternal && nodeLocalExternal) { - expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), svcPort, false, nil)) + expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil)) } bindingProtocol := binding.ProtocolTCP @@ -430,10 +430,10 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP eps = append(eps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)) makeEndpointsMap(fp, eps...) - expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), svcPort, true, nil)} + expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)} expectedAllEps := expectedLocalEps if !(nodeLocalInternal && nodeLocalExternal) { - expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), svcPort, false, nil)) + expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil)) } bindingProtocol := binding.ProtocolTCP diff --git a/pkg/agent/proxy/topology.go b/pkg/agent/proxy/topology.go new file mode 100644 index 00000000000..1777b38e7ce --- /dev/null +++ b/pkg/agent/proxy/topology.go @@ -0,0 +1,67 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + + k8sproxy "antrea.io/antrea/third_party/proxy" +) + +func filterEndpoints(endpoints map[string]k8sproxy.Endpoint, svcInfo k8sproxy.ServicePort, nodeLabels map[string]string) map[string]k8sproxy.Endpoint { + if svcInfo.NodeLocalExternal() || svcInfo.NodeLocalInternal() { + return endpoints + } + + return filterEndpointsWithHints(endpoints, svcInfo.HintsAnnotation(), nodeLabels) +} + +func filterEndpointsWithHints(endpoints map[string]k8sproxy.Endpoint, hintsAnnotation string, nodeLabels map[string]string) map[string]k8sproxy.Endpoint { + if hintsAnnotation != "Auto" && hintsAnnotation != "auto" { + if hintsAnnotation != "" && hintsAnnotation != "Disabled" && hintsAnnotation != "disabled" { + klog.InfoS("Skipping topology aware Endpoint filtering since Service has unexpected value", "annotationTopologyAwareHints", v1.AnnotationTopologyAwareHints, "hints", hintsAnnotation) + } + return endpoints + } + + zone, ok := nodeLabels[v1.LabelTopologyZone] + if !ok || zone == "" { + klog.InfoS("Skipping topology aware Endpoint filtering since Node is missing label", "label", v1.LabelTopologyZone) + return endpoints + } + + filteredEndpoints := make(map[string]k8sproxy.Endpoint) + + for key, endpoint := range endpoints { + if !endpoint.IsReady() { + continue + } + if endpoint.GetZoneHints().Len() == 0 { + klog.InfoS("Skipping topology aware Endpoint filtering since one or more Endpoints is missing a zone hint") + return endpoints + } + if endpoint.GetZoneHints().Has(zone) { + filteredEndpoints[key] = endpoint + } + } + + if len(filteredEndpoints) == 0 { + klog.InfoS("Skipping topology aware Endpoint filtering since no hints were provided for zone", "zone", zone) + return endpoints + } + + return filteredEndpoints +} diff --git a/pkg/agent/proxy/topology_test.go b/pkg/agent/proxy/topology_test.go new file mode 100644 index 00000000000..098f676065d --- /dev/null +++ b/pkg/agent/proxy/topology_test.go @@ -0,0 +1,123 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + + k8sproxy "antrea.io/antrea/third_party/proxy" +) + +func TestTopologyAwareHints(t *testing.T) { + testCases := []struct { + name string + nodeLabels map[string]string + hintsAnnotation string + endpoints map[string]k8sproxy.Endpoint + expectedEndpoints sets.String + }{ + { + name: "hints annotation == auto", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + hintsAnnotation: "auto", + endpoints: map[string]k8sproxy.Endpoint{ + "10.1.2.3:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + "10.1.2.4:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, + "10.1.2.5:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, + "10.1.2.6:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + }, + expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + }, + { + name: "hints annotation == disabled, hints ignored", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + hintsAnnotation: "disabled", + endpoints: map[string]k8sproxy.Endpoint{ + "10.1.2.3:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + "10.1.2.4:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, + "10.1.2.5:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, + "10.1.2.6:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + }, + expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + }, + { + name: "hints annotation == aUto (wrong capitalization), hints ignored", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + hintsAnnotation: "aUto", + endpoints: map[string]k8sproxy.Endpoint{ + "10.1.2.3:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + "10.1.2.4:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, + "10.1.2.5:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, + "10.1.2.6:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + }, + expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + }, + { + name: "hints annotation empty, hints ignored", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + hintsAnnotation: "", + endpoints: map[string]k8sproxy.Endpoint{ + "10.1.2.3:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + "10.1.2.4:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, + "10.1.2.5:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, + "10.1.2.6:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + }, + expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + }, + { + name: "empty node labels", + nodeLabels: nil, + hintsAnnotation: "auto", + endpoints: map[string]k8sproxy.Endpoint{ + "10.1.2.3:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + }, + expectedEndpoints: sets.NewString("10.1.2.3:80"), + }, + { + name: "empty zone label", + nodeLabels: map[string]string{v1.LabelTopologyZone: ""}, + hintsAnnotation: "auto", + endpoints: map[string]k8sproxy.Endpoint{ + "10.1.2.3:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + }, + expectedEndpoints: sets.NewString("10.1.2.3:80"), + }, + { + name: "node in different zone, no endpoint filtering", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-b"}, + hintsAnnotation: "auto", + endpoints: map[string]k8sproxy.Endpoint{ + "10.1.2.3:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + "10.1.2.5:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, + }, + expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.5:80"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + endpointsMap := filterEndpointsWithHints(tc.endpoints, tc.hintsAnnotation, tc.nodeLabels) + endpoints := sets.NewString() + for key := range endpointsMap { + endpoints.Insert(key) + } + assert.Equal(t, tc.expectedEndpoints, endpoints) + }) + } +} diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 0355d10276e..2801202ff9f 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -41,6 +41,11 @@ const ( // flag will not take effect. EndpointSlice featuregate.Feature = "EndpointSlice" + // alpha: v1.8 + // Enable TopologyAwareHints in AntreaProxy. If EndpointSlice is not enabled, this + // flag will not take effect. + TopologyAwareHints featuregate.Feature = "TopologyAwareHints" + // alpha: v0.8 // beta: v0.11 // Enable antrea proxy which provides ServiceLB for in-cluster services in antrea agent. @@ -121,6 +126,7 @@ var ( AntreaProxy: {Default: true, PreRelease: featuregate.Beta}, Egress: {Default: true, PreRelease: featuregate.Beta}, EndpointSlice: {Default: false, PreRelease: featuregate.Alpha}, + TopologyAwareHints: {Default: false, PreRelease: featuregate.Alpha}, Traceflow: {Default: true, PreRelease: featuregate.Beta}, AntreaIPAM: {Default: false, PreRelease: featuregate.Alpha}, FlowExporter: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/third_party/proxy/config/config.go b/third_party/proxy/config/config.go index 728c845bca6..c37107d582b 100644 --- a/third_party/proxy/config/config.go +++ b/third_party/proxy/config/config.go @@ -39,10 +39,10 @@ import ( "time" v1 "k8s.io/api/core/v1" - discovery "k8s.io/api/discovery/v1beta1" + discovery "k8s.io/api/discovery/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" coreinformers "k8s.io/client-go/informers/core/v1" - discoveryinformers "k8s.io/client-go/informers/discovery/v1beta1" + discoveryinformers "k8s.io/client-go/informers/discovery/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" ) @@ -370,3 +370,112 @@ func (c *EndpointSliceConfig) handleDeleteEndpointSlice(obj interface{}) { h.OnEndpointSliceDelete(endpointSlice) } } + +// NodeHandler is an abstract interface of objects which receive +// notifications about node object changes. +type NodeHandler interface { + // OnNodeAdd is called whenever creation of new node object + // is observed. + OnNodeAdd(node *v1.Node) + // OnNodeUpdate is called whenever modification of an existing + // node object is observed. + OnNodeUpdate(oldNode, node *v1.Node) + // OnNodeDelete is called whenever deletion of an existing node + // object is observed. + OnNodeDelete(node *v1.Node) + // OnNodeSynced is called once all the initial event handlers were + // called and the state is fully propagated to local cache. + OnNodeSynced() +} + +// NodeConfig tracks a set of node configurations. +// It accepts "set", "add" and "remove" operations of node via channels, and invokes registered handlers on change. +type NodeConfig struct { + listerSynced cache.InformerSynced + eventHandlers []NodeHandler +} + +// NewNodeConfig creates a new NodeConfig. +func NewNodeConfig(nodeInformer coreinformers.NodeInformer, resyncPeriod time.Duration) *NodeConfig { + result := &NodeConfig{ + listerSynced: nodeInformer.Informer().HasSynced, + } + + nodeInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: result.handleAddNode, + UpdateFunc: result.handleUpdateNode, + DeleteFunc: result.handleDeleteNode, + }, + resyncPeriod, + ) + + return result +} + +// RegisterEventHandler registers a handler which is called on every node change. +func (c *NodeConfig) RegisterEventHandler(handler NodeHandler) { + c.eventHandlers = append(c.eventHandlers, handler) +} + +// Run starts the goroutine responsible for calling registered handlers. +func (c *NodeConfig) Run(stopCh <-chan struct{}) { + klog.InfoS("Starting node config controller") + + if !cache.WaitForNamedCacheSync("node config", stopCh, c.listerSynced) { + return + } + + for i := range c.eventHandlers { + klog.V(3).InfoS("Calling handler.OnNodeSynced()") + c.eventHandlers[i].OnNodeSynced() + } +} + +func (c *NodeConfig) handleAddNode(obj interface{}) { + node, ok := obj.(*v1.Node) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + for i := range c.eventHandlers { + klog.V(4).InfoS("Calling handler.OnNodeAdd") + c.eventHandlers[i].OnNodeAdd(node) + } +} + +func (c *NodeConfig) handleUpdateNode(oldObj, newObj interface{}) { + oldNode, ok := oldObj.(*v1.Node) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj)) + return + } + node, ok := newObj.(*v1.Node) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) + return + } + for i := range c.eventHandlers { + klog.V(5).InfoS("Calling handler.OnNodeUpdate") + c.eventHandlers[i].OnNodeUpdate(oldNode, node) + } +} + +func (c *NodeConfig) handleDeleteNode(obj interface{}) { + node, ok := obj.(*v1.Node) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + if node, ok = tombstone.Obj.(*v1.Node); !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + } + for i := range c.eventHandlers { + klog.V(4).InfoS("Calling handler.OnNodeDelete") + c.eventHandlers[i].OnNodeDelete(node) + } +} diff --git a/third_party/proxy/endpoints.go b/third_party/proxy/endpoints.go index 0de50bce9f7..e21e38731c7 100644 --- a/third_party/proxy/endpoints.go +++ b/third_party/proxy/endpoints.go @@ -43,6 +43,8 @@ import ( "net" "strconv" + "k8s.io/apimachinery/pkg/util/sets" + utilproxy "antrea.io/antrea/third_party/proxy/util" ) @@ -53,8 +55,32 @@ import ( type BaseEndpointInfo struct { Endpoint string // TODO: should be an endpointString type // IsLocal indicates whether the endpoint is running in same host as kube-proxy. - IsLocal bool - Topology map[string]string + IsLocal bool + + // ZoneHints represent the zone hints for the endpoint. This is based on + // endpoint.hints.forZones[*].name in the EndpointSlice API. + ZoneHints sets.String + // Ready indicates whether this endpoint is ready and NOT terminating. + // For pods, this is true if a pod has a ready status and a nil deletion timestamp. + // This is only set when watching EndpointSlices. If using Endpoints, this is always + // true since only ready endpoints are read from Endpoints. + // TODO: Ready can be inferred from Serving and Terminating below when enabled by default. + Ready bool + // Serving indiciates whether this endpoint is ready regardless of its terminating state. + // For pods this is true if it has a ready status regardless of its deletion timestamp. + // This is only set when watching EndpointSlices. If using Endpoints, this is always + // true since only ready endpoints are read from Endpoints. + Serving bool + // Terminating indicates whether this endpoint is terminating. + // For pods this is true if it has a non-nil deletion timestamp. + // This is only set when watching EndpointSlices. If using Endpoints, this is always + // false since terminating endpoints are always excluded from Endpoints. + Terminating bool + + // NodeName is the name of the node this endpoint belongs to + NodeName string + // Zone is the name of the zone this endpoint belongs to + Zone string } var _ Endpoint = &BaseEndpointInfo{} @@ -69,9 +95,26 @@ func (info *BaseEndpointInfo) GetIsLocal() bool { return info.IsLocal } -// GetTopology returns the topology information of the endpoint. -func (info *BaseEndpointInfo) GetTopology() map[string]string { - return info.Topology +// IsReady returns true if an endpoint is ready and not terminating. +func (info *BaseEndpointInfo) IsReady() bool { + return info.Ready +} + +// IsServing returns true if an endpoint is ready, regardless of if the +// endpoint is terminating. +func (info *BaseEndpointInfo) IsServing() bool { + return info.Serving +} + +// IsTerminating retruns true if an endpoint is terminating. For pods, +// that is any pod with a deletion timestamp. +func (info *BaseEndpointInfo) IsTerminating() bool { + return info.Terminating +} + +// GetZoneHints returns the zone hint for the endpoint. +func (info *BaseEndpointInfo) GetZoneHints() sets.String { + return info.ZoneHints } // IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface. @@ -86,13 +129,31 @@ func (info *BaseEndpointInfo) Port() (int, error) { // Equal is part of proxy.Endpoint interface. func (info *BaseEndpointInfo) Equal(other Endpoint) bool { - return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal() + return info.String() == other.String() && + info.GetIsLocal() == other.GetIsLocal() && + info.IsReady() == other.IsReady() +} + +// GetNodeName returns the NodeName for this endpoint. +func (info *BaseEndpointInfo) GetNodeName() string { + return info.NodeName +} + +// GetZone returns the Zone for this endpoint. +func (info *BaseEndpointInfo) GetZone() string { + return info.Zone } -func NewBaseEndpointInfo(IP string, port int, isLocal bool, topology map[string]string) *BaseEndpointInfo { +func NewBaseEndpointInfo(IP, nodeName, zone string, port int, isLocal bool, + ready, serving, terminating bool, zoneHints sets.String) *BaseEndpointInfo { return &BaseEndpointInfo{ - Endpoint: net.JoinHostPort(IP, strconv.Itoa(port)), - IsLocal: isLocal, - Topology: topology, + Endpoint: net.JoinHostPort(IP, strconv.Itoa(port)), + IsLocal: isLocal, + Ready: ready, + Serving: serving, + Terminating: terminating, + ZoneHints: zoneHints, + NodeName: nodeName, + Zone: zone, } } diff --git a/third_party/proxy/types.go b/third_party/proxy/types.go index f224470bd63..f6bc9cffeb8 100644 --- a/third_party/proxy/types.go +++ b/third_party/proxy/types.go @@ -43,6 +43,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "antrea.io/antrea/third_party/proxy/config" ) @@ -117,14 +118,33 @@ type Endpoint interface { String() string // GetIsLocal returns true if the endpoint is running in same host as kube-proxy, otherwise returns false. GetIsLocal() bool - // GetTopology returns the topology information of the endpoint. - GetTopology() map[string]string + // IsReady returns true if an endpoint is ready and not terminating. + // This is only set when watching EndpointSlices. If using Endpoints, this is always + // true since only ready endpoints are read from Endpoints. + IsReady() bool + // IsServing returns true if an endpoint is ready. It does not account + // for terminating state. + // This is only set when watching EndpointSlices. If using Endpoints, this is always + // true since only ready endpoints are read from Endpoints. + IsServing() bool + // IsTerminating retruns true if an endpoint is terminating. For pods, + // that is any pod with a deletion timestamp. + // This is only set when watching EndpointSlices. If using Endpoints, this is always + // false since terminating endpoints are always excluded from Endpoints. + IsTerminating() bool + // GetZoneHint returns the zone hint for the endpoint. This is based on + // endpoint.hints.forZones[0].name in the EndpointSlice API. + GetZoneHints() sets.String // IP returns IP part of the endpoint. IP() string // Port returns the Port part of the endpoint. Port() (int, error) // Equal checks if two endpoints are equal. Equal(Endpoint) bool + // GetNodeName returns the node name for the endpoint + GetNodeName() string + // GetZone returns the zone for the endpoint + GetZone() string } // ServiceEndpoint is used to identify a service and one of its endpoint pair.