diff --git a/Makefile b/Makefile index e80f0f973f8..53f45e36072 100644 --- a/Makefile +++ b/Makefile @@ -152,6 +152,7 @@ endif -v $(CURDIR)/.coverage:/usr/src/github.com/vmware-tanzu/antrea/.coverage \ -v $(CURDIR):/usr/src/github.com/vmware-tanzu/antrea:ro \ -v /lib/modules:/lib/modules \ + --sysctl net.ipv6.conf.all.disable_ipv6=0 \ antrea/test test-integration $(USERID) $(GRPID) .PHONY: docker-tidy diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index dcf6a0fe39b..3b21e07a4fb 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -989,6 +989,14 @@ rules: - get - watch - list +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - watch + - list - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: @@ -1283,6 +1291,11 @@ data: # Service traffic. # AntreaProxy: true + # Enable EndpointSlice support in AntreaProxy. Don't enable this feature unless that EndpointSlice + # API version v1beta1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, + # this flag will not take effect. + # EndpointSlice: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1393,15 +1406,6 @@ data: # Provide the address of Kubernetes apiserver, to override any value provided in kubeconfig or InClusterConfig. # Defaults to "". It must be a host string, a host:port pair, or a URL to the base of the apiserver. #kubeAPIServerOverride: "" - - # Comma-separated list of Cipher Suites. If omitted, the default Go Cipher Suites will be used. - # https://golang.org/pkg/crypto/tls/#pkg-constants - # Note that TLS1.3 Cipher Suites cannot be added to the list. But the apiserver will always - # prefer TLS1.3 Cipher Suites whenever possible. - #tlsCipherSuites: - - # TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. - #tlsMinVersion: antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -1467,7 +1471,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-gt6f55df69 + name: antrea-config-824k5kcghd namespace: kube-system --- apiVersion: v1 @@ -1578,7 +1582,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-gt6f55df69 + name: antrea-config-824k5kcghd name: antrea-config - name: antrea-controller-tls secret: @@ -1842,7 +1846,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-gt6f55df69 + name: antrea-config-824k5kcghd name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index d5f0a610fd7..015f30fec54 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -989,6 +989,14 @@ rules: - get - watch - list +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - watch + - list - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: @@ -1283,6 +1291,11 @@ data: # Service traffic. # AntreaProxy: true + # Enable EndpointSlice support in AntreaProxy. Don't enable this feature unless that EndpointSlice + # API version v1beta1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, + # this flag will not take effect. + # EndpointSlice: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1467,7 +1480,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-gt6f55df69 + name: antrea-config-824k5kcghd namespace: kube-system --- apiVersion: v1 @@ -1578,7 +1591,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-gt6f55df69 + name: antrea-config-824k5kcghd name: antrea-config - name: antrea-controller-tls secret: @@ -1844,7 +1857,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-gt6f55df69 + name: antrea-config-824k5kcghd name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 4efcaa8548f..43fd13a73be 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -989,6 +989,14 @@ rules: - get - watch - list +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - watch + - list - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: @@ -1283,6 +1291,11 @@ data: # Service traffic. # AntreaProxy: true + # Enable EndpointSlice support in AntreaProxy. Don't enable this feature unless that EndpointSlice + # API version v1beta1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, + # this flag will not take effect. + # EndpointSlice: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1467,7 +1480,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-56ghk45g94 + name: antrea-config-bmt74b6652 namespace: kube-system --- apiVersion: v1 @@ -1578,7 +1591,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-56ghk45g94 + name: antrea-config-bmt74b6652 name: antrea-config - name: antrea-controller-tls secret: @@ -1845,7 +1858,7 @@ spec: path: /home/kubernetes/bin name: host-cni-bin - configMap: - name: antrea-config-56ghk45g94 + name: antrea-config-bmt74b6652 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 7564572efed..d6461cfe61e 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -989,6 +989,14 @@ rules: - get - watch - list +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - watch + - list - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: @@ -1283,6 +1291,11 @@ data: # Service traffic. # AntreaProxy: true + # Enable EndpointSlice support in AntreaProxy. Don't enable this feature unless that EndpointSlice + # API version v1beta1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, + # this flag will not take effect. + # EndpointSlice: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1472,7 +1485,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-c5f94kkkd9 + name: antrea-config-6k75ft5467 namespace: kube-system --- apiVersion: v1 @@ -1592,7 +1605,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-c5f94kkkd9 + name: antrea-config-6k75ft5467 name: antrea-config - name: antrea-controller-tls secret: @@ -1891,7 +1904,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-c5f94kkkd9 + name: antrea-config-6k75ft5467 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index dc06d9bdff4..5e7874c67f8 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -989,6 +989,14 @@ rules: - get - watch - list +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - watch + - list - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: @@ -1283,6 +1291,11 @@ data: # Service traffic. # AntreaProxy: true + # Enable EndpointSlice support in AntreaProxy. Don't enable this feature unless that EndpointSlice + # API version v1beta1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, + # this flag will not take effect. + # EndpointSlice: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1398,15 +1411,6 @@ data: # Provide the address of Kubernetes apiserver, to override any value provided in kubeconfig or InClusterConfig. # Defaults to "". It must be a host string, a host:port pair, or a URL to the base of the apiserver. #kubeAPIServerOverride: "" - - # Comma-separated list of Cipher Suites. If omitted, the default Go Cipher Suites will be used. - # https://golang.org/pkg/crypto/tls/#pkg-constants - # Note that TLS1.3 Cipher Suites cannot be added to the list. But the apiserver will always - # prefer TLS1.3 Cipher Suites whenever possible. - #tlsCipherSuites: - - # TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. - #tlsMinVersion: antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -1472,7 +1476,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-6h4c4bttfd + name: antrea-config-t9975t2t7c namespace: kube-system --- apiVersion: v1 @@ -1583,7 +1587,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-6h4c4bttfd + name: antrea-config-t9975t2t7c name: antrea-config - name: antrea-controller-tls secret: @@ -1847,7 +1851,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-6h4c4bttfd + name: antrea-config-t9975t2t7c name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/base/agent-rbac.yml b/build/yamls/base/agent-rbac.yml index 5a2e60bde33..09bb8c283a7 100644 --- a/build/yamls/base/agent-rbac.yml +++ b/build/yamls/base/agent-rbac.yml @@ -36,6 +36,14 @@ rules: - get - watch - list + - apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - watch + - list - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: diff --git a/build/yamls/base/conf/antrea-agent.conf b/build/yamls/base/conf/antrea-agent.conf index 2703afc6f75..82860650217 100644 --- a/build/yamls/base/conf/antrea-agent.conf +++ b/build/yamls/base/conf/antrea-agent.conf @@ -5,6 +5,11 @@ featureGates: # Service traffic. # AntreaProxy: true +# Enable EndpointSlice support in AntreaProxy. Don't enable this feature unless that EndpointSlice +# API version v1beta1 is supported and set as enabled in Kubernetes. If AntreaProxy is not enabled, +# this flag will not take effect. +# EndpointSlice: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 0cfa7fcc22b..03b99a2bf7c 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -25,6 +25,7 @@ function echoerr { _usage="Usage: $0 [--encap-mode ] [--no-proxy] [--np] [--coverage] [--help|-h] --encap-mode Traffic encapsulation mode. (default is 'encap') --no-proxy Disables Antrea proxy. + --endpointslice Enables Antrea proxy and EndpointSlice support --np Enables Namespaced Antrea NetworkPolicy CRDs and ClusterNetworkPolicy related CRDs. --coverage Enables measure Antrea code coverage when run e2e tests on kind. --help, -h Print this message and exit @@ -49,6 +50,7 @@ trap "quit" INT EXIT mode="" proxy=true +endpointslice=false np=false coverage=false while [[ $# -gt 0 ]] @@ -60,6 +62,10 @@ case $key in proxy=false shift ;; + --endpointslice) + endpointslice=true + shift + ;; --np) np=true shift @@ -87,6 +93,9 @@ manifest_args="" if ! $proxy; then manifest_args="$manifest_args --no-proxy" fi +if $endpointslice; then + manifest_args="$manifest_args --endpointslice" +fi if $np; then # See https://github.com/vmware-tanzu/antrea/issues/897 manifest_args="$manifest_args --np --tun vxlan" diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 9e186e9efac..b25f324de4f 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -187,6 +187,7 @@ func run(o *Options) error { } var proxier k8sproxy.Provider + if features.DefaultFeatureGate.Enabled(features.AntreaProxy) { v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode) v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode) diff --git a/docs/feature-gates.md b/docs/feature-gates.md index 9ee1abca8ef..2cbe78db56a 100644 --- a/docs/feature-gates.md +++ b/docs/feature-gates.md @@ -36,6 +36,7 @@ example, to enable `AntreaProxy` on Linux, edit the Agent configuration in the | Feature Name | Component | Default | Stage | Alpha Release | Beta Release | GA Release | Extra Requirements | Notes | | ----------------------- | ------------------ | ------- | ----- | ------------- | ------------ | ---------- | ------------------ | ----- | | `AntreaProxy` | Agent | `false` | Alpha | v0.8 | v0.11 | N/A | Yes | Must be enabled for Windows. | +| `EndpointSlice` | Agent | `false` | Alpha | v0.13.0 | N/A | N/A | Yes | | | `AntreaPolicy` | Agent + Controller | `false` | Alpha | v0.8 | N/A | N/A | No | Agent side config required from v0.9.0+. | | `Traceflow` | Agent + Controller | `false` | Alpha | v0.8 | v0.11 | N/A | Yes | | | `FlowExporter` | Agent | `false` | Alpha | v0.9 | N/A | N/A | Yes | | @@ -55,6 +56,19 @@ manifest provided as part of releases enables this feature by default. If you edit the manifest, make sure you do not disable it, as it is needed for correct NetworkPolicy implementation for Pod-to-Service traffic. +### EndpointSlice + +`EndpointSlice` enables Service EndpointSlice support in AntreaProxy. The +EndpointSlice API was introduced in Kubernetes 1.16 (alpha) and it is enabled +by default in Kubernetes 1.17 (beta). This flag will take no effect if AntreaProxy +is not enabled. The endpoint conditions of `Serving` and `Terminating` are not +supported currently. ServiceTopology is not supported either. Refer to this [link](https://kubernetes.io/docs/tasks/administer-cluster/enabling-endpointslices/) +for more information. EndpointSlice API version that AntreaProxy supports is v1beta1 +currently, and other EndpointSlice API versions are not supported. If EndpointSlice is +enabled in AntreaProxy, but EndpointSlice API is disabled in Kubernetes or EndpointSlice +API version v1beta1 is not supported in Kubernetes, error messages will be logged by +Antrea Agents and AntreaProxy will not implement Cluster IP functionality as expected. + #### Requirements for this Feature When using the OVS built-in kernel module (which is the most common case), your diff --git a/hack/generate-manifest.sh b/hack/generate-manifest.sh index 2b6adb31b4d..a00daa013e1 100755 --- a/hack/generate-manifest.sh +++ b/hack/generate-manifest.sh @@ -29,6 +29,7 @@ Generate a YAML manifest for Antrea using Kustomize and print it to stdout. --ipsec Generate a manifest with IPSec encryption of tunnel traffic enabled --all-features Generate a manifest with all alpha features enabled --no-proxy Generate a manifest with Antrea proxy disabled + --endpointslice Generate a manifest with EndpointSlice support enabled --np Generate a manifest with ClusterNetworkPolicy and Antrea NetworkPolicy features enabled --k8s-1.15 Generates a manifest which supports Kubernetes 1.15. --keep Debug flag which will preserve the generated kustomization.yml @@ -62,6 +63,7 @@ KIND=false IPSEC=false ALLFEATURES=false PROXY=true +ENDPOINTSLICE=false NP=false KEEP=false ENCAP_MODE="" @@ -106,6 +108,11 @@ case $key in PROXY=false shift ;; + --endpointslice) + PROXY=true + ENDPOINTSLICE=true + shift + ;; --np) NP=true shift @@ -149,6 +156,12 @@ case $key in esac done +if [ "$PROXY" == false ] && [ "$ENDPOINTSLICE" == true ]; then + echoerr "--endpointslice requires AntreaProxy and therefore cannot be used with --no-proxy" + print_help + exit 1 +fi + if [ "$MODE" != "dev" ] && [ "$MODE" != "release" ]; then echoerr "--mode must be one of 'dev' or 'release'" print_help @@ -230,12 +243,17 @@ if $ALLFEATURES; then sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*AntreaPolicy[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ AntreaPolicy: true/" antrea-agent.conf sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*FlowExporter[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ FlowExporter: true/" antrea-agent.conf sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*NetworkPolicyStats[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ NetworkPolicyStats: true/" antrea-agent.conf + sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*EndpointSlice[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ EndpointSlice: true/" antrea-agent.conf fi if ! $PROXY; then sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*AntreaProxy[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ AntreaProxy: false/" antrea-agent.conf fi +if $ENDPOINTSLICE; then + sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*EndpointSlice[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ EndpointSlice: true/" antrea-agent.conf +fi + if $NP; then sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*AntreaPolicy[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ AntreaPolicy: true/" antrea-controller.conf sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*AntreaPolicy[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ AntreaPolicy: true/" antrea-agent.conf diff --git a/pkg/agent/proxy/endpoints.go b/pkg/agent/proxy/endpoints.go index f4f72443555..74c4d81a028 100644 --- a/pkg/agent/proxy/endpoints.go +++ b/pkg/agent/proxy/endpoints.go @@ -21,6 +21,7 @@ import ( "sync" corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" apimachinerytypes "k8s.io/apimachinery/pkg/types" "k8s.io/klog" @@ -28,6 +29,12 @@ import ( k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" ) +var supportedEndpointSliceAddressTypes = map[discovery.AddressType]struct{}{ + discovery.AddressTypeIP: {}, // IP is a deprecated address type + discovery.AddressTypeIPv4: {}, + discovery.AddressTypeIPv6: {}, +} + // endpointsChange describes an Endpoints change, previous is the state from before // all of them, current is state after applying all of those. type endpointsChange struct { @@ -44,17 +51,23 @@ type endpointsChangesTracker struct { // initialized tells whether Endpoints have been synced. initialized bool // changes contains endpoints changes since the last checkoutChanges call. - changes map[apimachinerytypes.NamespacedName]*endpointsChange + changes map[apimachinerytypes.NamespacedName]*endpointsChange + sliceCache *EndpointSliceCache } -func newEndpointsChangesTracker(hostname string) *endpointsChangesTracker { - return &endpointsChangesTracker{ +func newEndpointsChangesTracker(hostname string, enableEndpointSlice bool, isIPv6 bool) *endpointsChangesTracker { + tracker := &endpointsChangesTracker{ hostname: hostname, changes: map[apimachinerytypes.NamespacedName]*endpointsChange{}, } + + if enableEndpointSlice { + tracker.sliceCache = NewEndpointSliceCache(hostname, isIPv6) + } + return tracker } -// OnEndpointUpdate updates given Service's Endpoints change map based on the +// OnEndpointUpdate updates the given Service's endpointsChange map based on the // Endpoints pair. It returns true if items changed, // otherwise it returns false. // Update can be used to add/update/delete items of EndpointsChangeMap. @@ -95,10 +108,42 @@ func (t *endpointsChangesTracker) OnEndpointUpdate(previous, current *corev1.End return len(t.changes) > 0 } +// EndpointSliceUpdate updates the given service's endpoints change map based on the endpoints pair. +// It returns true if items changed, otherwise it returns false. Will add/update/delete items of endpointsChange Map. +// If removeSlice is true, slice will be removed, otherwise it will be added or updated. +func (t *endpointsChangesTracker) OnEndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { + // This should never happen. + if endpointSlice == nil { + klog.Error("Nil endpointSlice passed to EndpointSliceUpdate") + return false + } + + if _, has := supportedEndpointSliceAddressTypes[endpointSlice.AddressType]; !has { + klog.V(4).Infof("EndpointSlice address type not supported: %s", endpointSlice.AddressType) + return false + } + + if _, _, err := endpointSliceCacheKeys(endpointSlice); err != nil { + klog.Warningf("Error getting endpoint slice cache keys: %v", err) + return false + } + + t.Lock() + defer t.Unlock() + + changeNeeded := t.sliceCache.updatePending(endpointSlice, removeSlice) + + return changeNeeded +} + func (t *endpointsChangesTracker) checkoutChanges() []*endpointsChange { t.Lock() defer t.Unlock() + if t.sliceCache != nil { + return t.sliceCache.checkoutChanges() + } + var changes []*endpointsChange for _, change := range t.changes { changes = append(changes, change) diff --git a/pkg/agent/proxy/endpointslicecache.go b/pkg/agent/proxy/endpointslicecache.go new file mode 100644 index 00000000000..85ec2f4dc23 --- /dev/null +++ b/pkg/agent/proxy/endpointslicecache.go @@ -0,0 +1,367 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Copyright 2020 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Original file https://raw.githubusercontent.com/kubernetes/kubernetes/0c0d4fea8dd6bdcd16b9e1d35da3f7d209341a6f/pkg/proxy/endpointslicecache.go +// If this file is located in third_party, there will be an import cycle issue when build Antrea as this file import +// "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types". +// Remove makeEndpointInfo and recorder in fields. +// Remove unused standardEndpointInfo. +// Remove unneeded sort.Sort in endpointsMapFromEndpointInfo. +// Update import paths. + +package proxy + +import ( + "fmt" + "reflect" + "sort" + "strings" + "sync" + + v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" + apimachinerytypes "k8s.io/apimachinery/pkg/types" + "k8s.io/klog" + utilnet "k8s.io/utils/net" + + "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" + "github.com/vmware-tanzu/antrea/third_party/proxy" +) + +// EndpointSliceCache is used as a cache of EndpointSlice information. +type EndpointSliceCache struct { + // lock protects trackerByServiceMap. + lock sync.Mutex + + // trackerByServiceMap is the basis of this cache. It contains endpoint + // slice trackers grouped by service name and endpoint slice name. The first + // key represents a namespaced service name while the second key represents + // an endpoint slice name. Since endpoints can move between slices, we + // require slice specific caching to prevent endpoints being removed from + // the cache when they may have just moved to a different slice. + trackerByServiceMap map[apimachinerytypes.NamespacedName]*endpointSliceTracker + + hostname string + isIPv6Mode bool +} + +// endpointSliceTracker keeps track of EndpointSlices as they have been applied +// by a proxier along with any pending EndpointSlices that have been updated +// in this cache but not yet applied by a proxier. +type endpointSliceTracker struct { + applied endpointSliceInfoByName + pending endpointSliceInfoByName +} + +// endpointSliceInfoByName groups endpointSliceInfo by the names of the +// corresponding EndpointSlices. +type endpointSliceInfoByName map[string]*endpointSliceInfo + +// endpointSliceInfo contains just the attributes kube-proxy cares about. +// Used for caching. Intentionally small to limit memory util. +type endpointSliceInfo struct { + Ports []discovery.EndpointPort + Endpoints []*endpointInfo + Remove bool +} + +// endpointInfo contains just the attributes kube-proxy cares about. +// Used for caching. Intentionally small to limit memory util. +// Addresses and Topology are copied from EndpointSlice Endpoints. +type endpointInfo struct { + Addresses []string + Topology map[string]string +} + +// spToEndpointMap stores groups Endpoint objects by ServicePortName and +// EndpointSlice name. +type spToEndpointMap map[proxy.ServicePortName]map[string]proxy.Endpoint + +// NewEndpointSliceCache initializes an EndpointSliceCache. +func NewEndpointSliceCache(hostname string, isIPv6Mode bool) *EndpointSliceCache { + return &EndpointSliceCache{ + trackerByServiceMap: map[apimachinerytypes.NamespacedName]*endpointSliceTracker{}, + hostname: hostname, + isIPv6Mode: isIPv6Mode, + } +} + +// newEndpointSliceTracker initializes an endpointSliceTracker. +func newEndpointSliceTracker() *endpointSliceTracker { + return &endpointSliceTracker{ + applied: endpointSliceInfoByName{}, + pending: endpointSliceInfoByName{}, + } +} + +// newEndpointSliceInfo generates endpointSliceInfo from an EndpointSlice. +func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceInfo { + esInfo := &endpointSliceInfo{ + Ports: make([]discovery.EndpointPort, len(endpointSlice.Ports)), + Endpoints: []*endpointInfo{}, + Remove: remove, + } + + // copy here to avoid mutating shared EndpointSlice object. + copy(esInfo.Ports, endpointSlice.Ports) + sort.Sort(byPort(esInfo.Ports)) + + if !remove { + for _, endpoint := range endpointSlice.Endpoints { + if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready { + esInfo.Endpoints = append(esInfo.Endpoints, &endpointInfo{ + Addresses: endpoint.Addresses, + Topology: endpoint.Topology, + }) + } + } + + sort.Sort(byAddress(esInfo.Endpoints)) + } + + return esInfo +} + +// updatePending updates a pending slice in the cache. +func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.EndpointSlice, remove bool) bool { + serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice) + if err != nil { + klog.Warningf("Error getting endpoint slice cache keys: %v", err) + return false + } + + esInfo := newEndpointSliceInfo(endpointSlice, remove) + + cache.lock.Lock() + defer cache.lock.Unlock() + + if _, ok := cache.trackerByServiceMap[serviceKey]; !ok { + cache.trackerByServiceMap[serviceKey] = newEndpointSliceTracker() + } + + changed := cache.esInfoChanged(serviceKey, sliceKey, esInfo) + + if changed { + cache.trackerByServiceMap[serviceKey].pending[sliceKey] = esInfo + } + + return changed +} + +// checkoutChanges returns a list of all endpointsChanges that are +// pending and then marks them as applied. +func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange { + changes := []*endpointsChange{} + + cache.lock.Lock() + defer cache.lock.Unlock() + + for serviceNN, esTracker := range cache.trackerByServiceMap { + if len(esTracker.pending) == 0 { + continue + } + + change := &endpointsChange{} + + change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied) + + for name, sliceInfo := range esTracker.pending { + if sliceInfo.Remove { + delete(esTracker.applied, name) + } else { + esTracker.applied[name] = sliceInfo + } + + delete(esTracker.pending, name) + } + + change.current = cache.getEndpointsMap(serviceNN, esTracker.applied) + changes = append(changes, change) + } + + return changes +} + +// getEndpointsMap computes an EndpointsMap for a given set of EndpointSlices. +func (cache *EndpointSliceCache) getEndpointsMap(serviceNN apimachinerytypes.NamespacedName, sliceInfoByName endpointSliceInfoByName) types.EndpointsMap { + endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN, sliceInfoByName) + return endpointsMapFromEndpointInfo(endpointInfoBySP) +} + +// endpointInfoByServicePort groups endpoint info by service port name and address. +func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN apimachinerytypes.NamespacedName, sliceInfoByName endpointSliceInfoByName) spToEndpointMap { + endpointInfoBySP := spToEndpointMap{} + + for _, sliceInfo := range sliceInfoByName { + for _, port := range sliceInfo.Ports { + if port.Name == nil { + klog.Warningf("Ignoring port with nil name %v", port) + continue + } + // TODO: handle nil ports to mean "all" + if port.Port == nil || *port.Port == int32(0) { + klog.Warningf("Ignoring invalid endpoint port %s", *port.Name) + continue + } + + svcPortName := proxy.ServicePortName{ + NamespacedName: serviceNN, + Port: *port.Name, + Protocol: *port.Protocol, + } + + endpointInfoBySP[svcPortName] = cache.addEndpointsByIP(serviceNN, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints) + } + } + + return endpointInfoBySP +} + +// addEndpointsByIP adds endpointInfo for each IP. +func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN apimachinerytypes.NamespacedName, portNum int, endpointsByIP map[string]proxy.Endpoint, endpoints []*endpointInfo) map[string]proxy.Endpoint { + if endpointsByIP == nil { + endpointsByIP = map[string]proxy.Endpoint{} + } + + // iterate through endpoints to add them to endpointsByIP. + for _, endpoint := range endpoints { + if len(endpoint.Addresses) == 0 { + klog.Warningf("Ignoring invalid endpoint port %s with empty addresses", endpoint) + continue + } + + // Filter out the incorrect IP version case. Any endpoint port that + // contains incorrect IP version will be ignored. + if cache.isIPv6Mode && utilnet.IsIPv6String(endpoint.Addresses[0]) != cache.isIPv6Mode { + continue + } + + isLocal := cache.isLocal(endpoint.Topology[v1.LabelHostname]) + endpointInfo := proxy.NewBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpoint.Topology) + + // This logic ensures we're deduping potential overlapping endpoints + // isLocal should not vary between matching IPs, but if it does, we + // favor a true value here if it exists. + if _, exists := endpointsByIP[endpointInfo.IP()]; !exists || isLocal { + endpointsByIP[endpointInfo.IP()] = endpointInfo + } + } + + return endpointsByIP +} + +func (cache *EndpointSliceCache) isLocal(hostname string) bool { + return len(cache.hostname) > 0 && hostname == cache.hostname +} + +// esInfoChanged returns true if the esInfo parameter should be set as a new +// pending value in the cache. +func (cache *EndpointSliceCache) esInfoChanged(serviceKey apimachinerytypes.NamespacedName, sliceKey string, esInfo *endpointSliceInfo) bool { + if _, ok := cache.trackerByServiceMap[serviceKey]; ok { + appliedInfo, appliedOk := cache.trackerByServiceMap[serviceKey].applied[sliceKey] + pendingInfo, pendingOk := cache.trackerByServiceMap[serviceKey].pending[sliceKey] + + // If there's already a pending value, return whether or not this would + // change that. + if pendingOk { + return !reflect.DeepEqual(esInfo, pendingInfo) + } + + // If there's already an applied value, return whether or not this would + // change that. + if appliedOk { + return !reflect.DeepEqual(esInfo, appliedInfo) + } + } + + // If this is marked for removal and does not exist in the cache, no changes + // are necessary. + if esInfo.Remove { + return false + } + + // If not in the cache, and not marked for removal, it should be added. + return true +} + +// endpointsMapFromEndpointInfo computes an endpointsMap from endpointInfo that +// has been grouped by service port and IP. +func endpointsMapFromEndpointInfo(endpointInfoBySP map[proxy.ServicePortName]map[string]proxy.Endpoint) types.EndpointsMap { + endpointsMap := types.EndpointsMap{} + + // transform endpointInfoByServicePort into an endpointsMap. + for svcPortName, endpointInfoByIP := range endpointInfoBySP { + if len(endpointInfoByIP) > 0 { + endpointsMap[svcPortName] = map[string]proxy.Endpoint{} + for _, endpointInfo := range endpointInfoByIP { + endpointsMap[svcPortName][endpointInfo.String()] = endpointInfo + } + } + } + + return endpointsMap +} + +// endpointSliceCacheKeys returns cache keys used for a given EndpointSlice. +func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (apimachinerytypes.NamespacedName, string, error) { + var err error + serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName] + if !ok || serviceName == "" { + err = fmt.Errorf("no %s label set on endpoint slice: %s", discovery.LabelServiceName, endpointSlice.Name) + } else if endpointSlice.Namespace == "" || endpointSlice.Name == "" { + err = fmt.Errorf("expected EndpointSlice name and namespace to be set: %v", endpointSlice) + } + return apimachinerytypes.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err +} + +// byAddress helps sort endpointInfo +type byAddress []*endpointInfo + +func (e byAddress) Len() int { + return len(e) +} +func (e byAddress) Swap(i, j int) { + e[i], e[j] = e[j], e[i] +} +func (e byAddress) Less(i, j int) bool { + return strings.Join(e[i].Addresses, ",") < strings.Join(e[j].Addresses, ",") +} + +// byPort helps sort EndpointSlice ports by port number +type byPort []discovery.EndpointPort + +func (p byPort) Len() int { + return len(p) +} +func (p byPort) Swap(i, j int) { + p[i], p[j] = p[j], p[i] +} +func (p byPort) Less(i, j int) bool { + return *p[i].Port < *p[j].Port +} diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index a3434a1599f..2db298c91ee 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -20,6 +20,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/tools/record" @@ -30,6 +31,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/proxy/metrics" "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" "github.com/vmware-tanzu/antrea/pkg/agent/querier" + "github.com/vmware-tanzu/antrea/pkg/features" binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" "github.com/vmware-tanzu/antrea/third_party/proxy/config" @@ -41,9 +43,10 @@ const ( ) type proxier struct { - once sync.Once - endpointsConfig *config.EndpointsConfig - serviceConfig *config.ServiceConfig + once sync.Once + endpointSliceConfig *config.EndpointSliceConfig + endpointsConfig *config.EndpointsConfig + serviceConfig *config.ServiceConfig // endpointsChanges and serviceChanges contains all changes to endpoints and // services that happened since last syncProxyRules call. For a single object, // changes are accumulated. Once both endpointsChanges and serviceChanges @@ -64,11 +67,12 @@ type proxier struct { // serviceStringMapMutex protects serviceStringMap object. serviceStringMapMutex sync.Mutex - runner *k8sproxy.BoundedFrequencyRunner - stopChan <-chan struct{} - agentQuerier querier.AgentQuerier - ofClient openflow.Client - isIPv6 bool + runner *k8sproxy.BoundedFrequencyRunner + stopChan <-chan struct{} + agentQuerier querier.AgentQuerier + ofClient openflow.Client + isIPv6 bool + enableEndpointSlice bool } func (p *proxier) isInitialized() bool { @@ -105,6 +109,7 @@ func (p *proxier) removeStaleServices() { } } delete(p.serviceInstalledMap, svcPortName) + delete(p.endpointInstalledMap, svcPortName) p.deleteServiceByIP(svcInfo.String()) p.groupCounter.Recycle(svcPortName) } @@ -276,6 +281,9 @@ func (p *proxier) installServices() { } p.serviceInstalledMap[svcPortName] = svcPort + for _, endpoint := range endpointUpdateList { + p.endpointInstalledMap[svcPortName][endpoint.String()] = struct{}{} + } p.addServiceByIP(svcInfo.String(), svcPortName) } } @@ -353,6 +361,31 @@ func (p *proxier) OnEndpointsSynced() { } } +func (p *proxier) OnEndpointSliceAdd(endpointSlice *v1beta1.EndpointSlice) { + if p.endpointsChanges.OnEndpointSliceUpdate(endpointSlice, false) && p.isInitialized() { + p.runner.Run() + } +} + +func (p *proxier) OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *v1beta1.EndpointSlice) { + if p.endpointsChanges.OnEndpointSliceUpdate(newEndpointSlice, false) && p.isInitialized() { + p.runner.Run() + } +} + +func (p *proxier) OnEndpointSliceDelete(endpointSlice *v1beta1.EndpointSlice) { + if p.endpointsChanges.OnEndpointSliceUpdate(endpointSlice, true) && p.isInitialized() { + p.runner.Run() + } +} + +func (p *proxier) OnEndpointSlicesSynced() { + p.endpointsChanges.OnEndpointsSynced() + if p.isInitialized() { + p.runner.Run() + } +} + func (p *proxier) OnServiceAdd(service *corev1.Service) { p.OnServiceUpdate(nil, service) } @@ -412,7 +445,11 @@ func (p *proxier) deleteServiceByIP(serviceStr string) { func (p *proxier) Run(stopCh <-chan struct{}) { p.once.Do(func() { go p.serviceConfig.Run(stopCh) - go p.endpointsConfig.Run(stopCh) + if p.enableEndpointSlice { + go p.endpointSliceConfig.Run(stopCh) + } else { + go p.endpointsConfig.Run(stopCh) + } p.stopChan = stopCh p.SyncLoop() }) @@ -429,11 +466,14 @@ func NewProxier( ) metrics.Register() klog.Infof("Creating proxier with IPv6 enabled=%t", isIPv6) + + enableEndpointSlice := features.DefaultFeatureGate.Enabled(features.EndpointSlice) + p := &proxier{ - endpointsConfig: config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod), + enableEndpointSlice: enableEndpointSlice, serviceConfig: config.NewServiceConfig(informerFactory.Core().V1().Services(), resyncPeriod), - endpointsChanges: newEndpointsChangesTracker(hostname), serviceChanges: newServiceChangesTracker(recorder, isIPv6), + endpointsChanges: newEndpointsChangesTracker(hostname, enableEndpointSlice, isIPv6), serviceMap: k8sproxy.ServiceMap{}, serviceInstalledMap: k8sproxy.ServiceMap{}, endpointInstalledMap: map[k8sproxy.ServicePortName]map[string]struct{}{}, @@ -444,7 +484,13 @@ func NewProxier( isIPv6: isIPv6, } p.serviceConfig.RegisterEventHandler(p) - p.endpointsConfig.RegisterEventHandler(p) + if enableEndpointSlice { + p.endpointSliceConfig = config.NewEndpointSliceConfig(informerFactory.Discovery().V1beta1().EndpointSlices(), resyncPeriod) + p.endpointSliceConfig.RegisterEventHandler(p) + } else { + p.endpointsConfig = config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod) + p.endpointsConfig.RegisterEventHandler(p) + } p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, 0, 30*time.Second, -1) return p } diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 3efe5349d49..723654cc9ea 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -88,7 +88,7 @@ func NewFakeProxier(ofClient openflow.Client, isIPv6 bool) *proxier { corev1.EventSource{Component: componentName, Host: hostname}, ) p := &proxier{ - endpointsChanges: newEndpointsChangesTracker(hostname), + endpointsChanges: newEndpointsChangesTracker(hostname, false, isIPv6), serviceChanges: newServiceChangesTracker(recorder, isIPv6), serviceMap: k8sproxy.ServiceMap{}, serviceInstalledMap: k8sproxy.ServiceMap{}, @@ -152,6 +152,61 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { fp.syncProxyRules() } +func TestLoadbalancer(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockOFClient := ofmock.NewMockClient(ctrl) + fp := NewFakeProxier(mockOFClient, false) + + svcIPv4 := net.ParseIP("10.20.30.41") + svcPort := 80 + loadBalancerIPv4 := net.ParseIP("169.254.0.1") + svcPortName := k8sproxy.ServicePortName{ + NamespacedName: makeNamespaceName("ns1", "svc1"), + Port: "80", + Protocol: corev1.ProtocolTCP, + } + makeServiceMap(fp, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { + svc.Spec.ClusterIP = svcIPv4.String() + svc.Spec.LoadBalancerIP = loadBalancerIPv4.String() + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + ingress := []corev1.LoadBalancerIngress{{IP: loadBalancerIPv4.String()}} + svc.Status.LoadBalancer.Ingress = ingress + svc.Spec.Ports = []corev1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }} + }), + ) + + epIP := net.ParseIP("10.180.0.1") + makeEndpointsMap(fp, + makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *corev1.Endpoints) { + ept.Subsets = []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{ + IP: epIP.String(), + }}, + Ports: []corev1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }}, + }} + }), + ) + + groupID, _ := fp.groupCounter.Get(svcPortName) + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any(), false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallLoadBalancerServiceFromOutsideFlows(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + fp.syncProxyRules() +} + func TestClusterIPv4(t *testing.T) { testClusterIP(t, net.ParseIP("10.20.30.41"), net.ParseIP("10.180.0.1"), false) } @@ -250,7 +305,6 @@ func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { }), ) makeEndpointsMap(fp) - fp.syncProxyRules() } diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 1ae07e0d4b1..a8e4aca2370 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -33,6 +33,11 @@ const ( // Allows to apply ClusterNetworkPolicy and AntreaNetworkPolicy CRDs. AntreaPolicy featuregate.Feature = "AntreaPolicy" + // alpha: v0.13 + // Enable EndpointSlice support in AntreaProxy. If AntreaProxy is not enabled, this + // flag will not take effect. + EndpointSlice featuregate.Feature = "EndpointSlice" + // alpha: v0.8 // beta: v0.11 // Enable antrea proxy which provides ServiceLB for in-cluster services in antrea agent. @@ -72,6 +77,7 @@ var ( defaultAntreaFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ AntreaPolicy: {Default: false, PreRelease: featuregate.Alpha}, AntreaProxy: {Default: true, PreRelease: featuregate.Beta}, + EndpointSlice: {Default: false, PreRelease: featuregate.Alpha}, Traceflow: {Default: true, PreRelease: featuregate.Beta}, FlowExporter: {Default: false, PreRelease: featuregate.Alpha}, NetworkPolicyStats: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/third_party/proxy/config/config.go b/third_party/proxy/config/config.go index 16b4acfa4c2..9cc67c6fa94 100644 --- a/third_party/proxy/config/config.go +++ b/third_party/proxy/config/config.go @@ -39,8 +39,10 @@ import ( "time" "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" coreinformers "k8s.io/client-go/informers/core/v1" + discoveryinformers "k8s.io/client-go/informers/discovery/v1beta1" "k8s.io/client-go/tools/cache" "k8s.io/klog" ) @@ -79,6 +81,23 @@ type EndpointsHandler interface { OnEndpointsSynced() } +// EndpointSliceHandler is an abstract interface of objects which receive +// notifications about endpoint slice object changes. +type EndpointSliceHandler interface { + // OnEndpointSliceAdd is called whenever creation of new endpoint slice + // object is observed. + OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) + // OnEndpointSliceUpdate is called whenever modification of an existing + // endpoint slice object is observed. + OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice) + // OnEndpointSliceDelete is called whenever deletion of an existing + // endpoint slice object is observed. + OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) + // OnEndpointSlicesSynced is called once all the initial event handlers were + // called and the state is fully propagated to local cache. + OnEndpointSlicesSynced() +} + // EndpointsConfig tracks a set of endpoints configurations. type EndpointsConfig struct { listerSynced cache.InformerSynced @@ -260,3 +279,94 @@ func (c *ServiceConfig) handleDeleteService(obj interface{}) { c.eventHandlers[i].OnServiceDelete(service) } } + +// EndpointSliceConfig tracks a set of endpoints configurations. +type EndpointSliceConfig struct { + listerSynced cache.InformerSynced + eventHandlers []EndpointSliceHandler +} + +// NewEndpointSliceConfig creates a new EndpointSliceConfig. +func NewEndpointSliceConfig(endpointSliceInformer discoveryinformers.EndpointSliceInformer, resyncPeriod time.Duration) *EndpointSliceConfig { + result := &EndpointSliceConfig{ + listerSynced: endpointSliceInformer.Informer().HasSynced, + } + + endpointSliceInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: result.handleAddEndpointSlice, + UpdateFunc: result.handleUpdateEndpointSlice, + DeleteFunc: result.handleDeleteEndpointSlice, + }, + resyncPeriod, + ) + + return result +} + +// RegisterEventHandler registers a handler which is called on every endpoint slice change. +func (c *EndpointSliceConfig) RegisterEventHandler(handler EndpointSliceHandler) { + c.eventHandlers = append(c.eventHandlers, handler) +} + +// Run waits for cache synced and invokes handlers after syncing. +func (c *EndpointSliceConfig) Run(stopCh <-chan struct{}) { + klog.Info("Starting endpoint slice config controller") + + if !cache.WaitForNamedCacheSync("endpoint slice config", stopCh, c.listerSynced) { + return + } + + for _, h := range c.eventHandlers { + klog.V(3).Infof("Calling handler.OnEndpointSlicesSynced()") + h.OnEndpointSlicesSynced() + } +} + +func (c *EndpointSliceConfig) handleAddEndpointSlice(obj interface{}) { + endpointSlice, ok := obj.(*discovery.EndpointSlice) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj)) + return + } + for _, h := range c.eventHandlers { + klog.V(4).Infof("Calling handler.OnEndpointSliceAdd %+v", endpointSlice) + h.OnEndpointSliceAdd(endpointSlice) + } +} + +func (c *EndpointSliceConfig) handleUpdateEndpointSlice(oldObj, newObj interface{}) { + oldEndpointSlice, ok := oldObj.(*discovery.EndpointSlice) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj)) + return + } + newEndpointSlice, ok := newObj.(*discovery.EndpointSlice) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj)) + return + } + for _, h := range c.eventHandlers { + klog.V(4).Infof("Calling handler.OnEndpointSliceUpdate") + h.OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice) + } +} + +func (c *EndpointSliceConfig) handleDeleteEndpointSlice(obj interface{}) { + endpointSlice, ok := obj.(*discovery.EndpointSlice) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj)) + return + } + if endpointSlice, ok = tombstone.Obj.(*discovery.EndpointSlice); !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj)) + return + } + } + for _, h := range c.eventHandlers { + klog.V(4).Infof("Calling handler.OnEndpointsDelete") + h.OnEndpointSliceDelete(endpointSlice) + } +} diff --git a/third_party/proxy/endpoints.go b/third_party/proxy/endpoints.go index 85e1383454b..7cc121e3bb4 100644 --- a/third_party/proxy/endpoints.go +++ b/third_party/proxy/endpoints.go @@ -40,6 +40,9 @@ Modifies: package proxy import ( + "net" + "strconv" + utilproxy "github.com/vmware-tanzu/antrea/third_party/proxy/util" ) @@ -85,3 +88,11 @@ func (info *BaseEndpointInfo) Port() (int, error) { func (info *BaseEndpointInfo) Equal(other Endpoint) bool { return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal() } + +func NewBaseEndpointInfo(IP string, port int, isLocal bool, topology map[string]string) *BaseEndpointInfo { + return &BaseEndpointInfo{ + Endpoint: net.JoinHostPort(IP, strconv.Itoa(port)), + IsLocal: isLocal, + Topology: topology, + } +}