From 079c4e79663c5f5d8d6c9289c8f05a7c1d325ffa Mon Sep 17 00:00:00 2001 From: Lan Luo Date: Wed, 27 Apr 2022 16:03:29 +0800 Subject: [PATCH] Use Service ClusterIPs as MC Service endpoint Use Service ClusterIPs instead of Pod IP as MC Service Endpoint. The ServiceExport controller will watch only watch ServiceExport and Service events, wrap Services' ClusterIPs into a new Endpoint kind of ResourceExport. Signed-off-by: Lan Luo --- .../multicluster/serviceexport_controller.go | 72 ++++++++----------- .../serviceexport_controller_test.go | 14 +--- .../serviceexport_controller_test.go | 42 +---------- 3 files changed, 36 insertions(+), 92 deletions(-) diff --git a/multicluster/controllers/multicluster/serviceexport_controller.go b/multicluster/controllers/multicluster/serviceexport_controller.go index 96a6608ff8a..f89e6797879 100644 --- a/multicluster/controllers/multicluster/serviceexport_controller.go +++ b/multicluster/controllers/multicluster/serviceexport_controller.go @@ -19,7 +19,6 @@ package multicluster import ( "context" "reflect" - "sort" corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" @@ -75,7 +74,6 @@ type ( const ( // cached indexer svcIndexerByType = "svc.type" - epIndexerByLabel = "ep.label" ) type reason int @@ -96,9 +94,7 @@ func NewServiceExportReconciler( installedSvcs: cache.NewIndexer(svcInfoKeyFunc, cache.Indexers{ svcIndexerByType: svcIndexerByTypeFunc, }), - installedEps: cache.NewIndexer(epInfoKeyFunc, cache.Indexers{ - epIndexerByLabel: epIndexerByLabelFunc, - }), + installedEps: cache.NewIndexer(epInfoKeyFunc, cache.Indexers{}), } return reconciler } @@ -117,34 +113,17 @@ func epInfoKeyFunc(obj interface{}) (string, error) { return common.NamespacedName(ep.namespace, ep.name), nil } -func epIndexerByLabelFunc(obj interface{}) ([]string, error) { - var info []string - ep := obj.(*epInfo) - keys := make([]string, len(ep.labels)) - i := 0 - for k := range ep.labels { - keys[i] = k - i++ - } - sort.Strings(keys) - for _, k := range keys { - info = append(info, k+ep.labels[k]) - } - return info, nil -} - //+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=resourceexports,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=resourceexports/status,verbs=get;update;patch //+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=resourceexports/finalizers,verbs=update //+kubebuilder:rbac:groups=multicluster.x-k8s.io,resources=serviceexports,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=multicluster.x-k8s.io,resources=serviceexports/status,verbs=get;update;patch //+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;update -//+kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch;update // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // For ServiceExport Reconcile, it watches events of ServiceExport resources, -// and also Endpoints/Services resource. It will create/update/remove ResourceExport +// and also Services resource. It will create/update/remove ResourceExport // in a leader cluster for corresponding ServiceExport from a member cluster. func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { klog.V(2).InfoS("Reconciling ServiceExport", "serviceexport", req.NamespacedName) @@ -246,8 +225,8 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques } } - // We also watch Service and Endpoints events via events mapping function. - // Need to check cache and compare with cache if there is any change for Service or Endpoints. + // We also watch Service events via events mapping function. + // Need to check cache and compare with cache if there is any change for Service. var svcNoChange, epNoChange bool svcExportNSName := common.NamespacedName(r.leaderNamespace, svcResExportName) epExportNSName := common.NamespacedName(r.leaderNamespace, epResExportName) @@ -267,20 +246,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques Namespace: req.Namespace, }, } - - err = r.Client.Get(ctx, req.NamespacedName, ep) - if err != nil { - klog.ErrorS(err, "Failed to get Endpoints", "endpoints", req.String()) - if apierrors.IsNotFound(err) && epInstalled { - err = r.handleEndpointDeleteEvent(ctx, req, remoteCluster) - if err != nil { - return ctrl.Result{}, err - } - r.installedEps.Delete(epObj) - } - return ctrl.Result{}, client.IgnoreNotFound(err) - } - + ep.Subsets = []corev1.EndpointSubset{getSubsets(svc)} if epInstalled { installedEp := epObj.(*epInfo) if apiequality.Semantic.DeepEqual(getEndPointsPorts(ep), installedEp.ports) && @@ -449,7 +415,6 @@ func (r *ServiceExportReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&k8smcsv1alpha1.ServiceExport{}). Watches(&source.Kind{Type: &corev1.Service{}}, handler.EnqueueRequestsFromMapFunc(objMapFunc)). - Watches(&source.Kind{Type: &corev1.Endpoints{}}, handler.EnqueueRequestsFromMapFunc(objMapFunc)). WithOptions(controller.Options{ MaxConcurrentReconciles: common.DefaultWorkerCount, }). @@ -517,7 +482,6 @@ func (r *ServiceExportReconciler) endpointsHandler( namespace: ep.Namespace, addressIPs: getEndPointsAddress(ep), ports: getEndPointsPorts(ep), - labels: ep.Labels, } r.refreshResourceExport(resName, kind, nil, ep, &re) existResExport := &mcsv1alpha1.ResourceExport{} @@ -621,3 +585,29 @@ func getResourceExportName(clusterID string, req ctrl.Request, kind string) stri func getStringPointer(str string) *string { return &str } + +func getSubsets(svc *corev1.Service) corev1.EndpointSubset { + var epSubset corev1.EndpointSubset + for _, ip := range svc.Spec.ClusterIPs { + epSubset.Addresses = append(epSubset.Addresses, corev1.EndpointAddress{IP: ip}) + } + + epSubset.Ports = portsConverter(svc.Spec.Ports) + return epSubset +} + +// portsConverter will convert Service's port to EndpointPort +func portsConverter(ports []corev1.ServicePort) []corev1.EndpointPort { + if len(ports) == 0 { + return nil + } + var epPorts []corev1.EndpointPort + for _, p := range ports { + epPorts = append(epPorts, corev1.EndpointPort{ + Name: p.Name, + Port: p.Port, + Protocol: p.Protocol, + }) + } + return epPorts +} diff --git a/multicluster/controllers/multicluster/serviceexport_controller_test.go b/multicluster/controllers/multicluster/serviceexport_controller_test.go index da0bca0a28f..1275d7520b1 100644 --- a/multicluster/controllers/multicluster/serviceexport_controller_test.go +++ b/multicluster/controllers/multicluster/serviceexport_controller_test.go @@ -222,18 +222,9 @@ func TestServiceExportReconciler_handleServiceUpdateEvent(t *testing.T) { ports: svcNginx.Spec.Ports, svcType: string(svcNginx.Spec.Type), } - epInfo := &epInfo{ - name: epNginx.Name, - namespace: epNginx.Namespace, - addressIPs: getEndPointsAddress(epNginx), - ports: getEndPointsPorts(epNginx), - labels: epNginx.Labels, - } newSvcNginx := svcNginx.DeepCopy() newSvcNginx.Spec.Ports = []corev1.ServicePort{svcPort8080} - newEpNginx := epNginx.DeepCopy() - newEpNginx.Subsets[0].Ports = epPorts8080 re := mcsv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ @@ -259,13 +250,12 @@ func TestServiceExportReconciler_handleServiceUpdateEvent(t *testing.T) { existEpRe.Name = "cluster-a-default-nginx-endpoints" existEpRe.Spec.Endpoints = &mcsv1alpha1.EndpointsExport{Subsets: epNginxSubset} - fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(newSvcNginx, newEpNginx, existSvcExport).Build() + fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(newSvcNginx, existSvcExport).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existSvcRe, existEpRe).Build() _ = commonarea.NewFakeRemoteCommonArea(scheme, &remoteMgr, fakeRemoteClient, "leader-cluster", "default") r := NewServiceExportReconciler(fakeClient, scheme, &remoteMgr) r.installedSvcs.Add(sinfo) - r.installedEps.Add(epInfo) if _, err := r.Reconcile(ctx, nginxReq); err != nil { t.Errorf("ServiceExport Reconciler should update ResourceExports but got error = %v", err) } else { @@ -296,7 +286,7 @@ func TestServiceExportReconciler_handleServiceUpdateEvent(t *testing.T) { { Addresses: []corev1.EndpointAddress{ { - IP: "192.168.17.11", + IP: "192.168.2.3", }, }, Ports: epPorts8080, diff --git a/multicluster/test/integration/serviceexport_controller_test.go b/multicluster/test/integration/serviceexport_controller_test.go index 439a2c9e6c2..c52e835b6c1 100644 --- a/multicluster/test/integration/serviceexport_controller_test.go +++ b/multicluster/test/integration/serviceexport_controller_test.go @@ -39,7 +39,8 @@ import ( var _ = Describe("ServiceExport controller", func() { svcSpec := corev1.ServiceSpec{ - Ports: svcPorts, + ClusterIP: "10.96.11.10", + Ports: svcPorts, } svc := &corev1.Service{ @@ -53,7 +54,6 @@ var _ = Describe("ServiceExport controller", func() { Namespace: svc.Namespace, Name: svc.Name, } - epNamespacedName := svcNamespacedName ep := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ @@ -108,7 +108,7 @@ var _ = Describe("ServiceExport controller", func() { { Addresses: []corev1.EndpointAddress{ { - IP: "192.168.17.11", + IP: "10.96.11.10", }, }, Ports: epPorts, @@ -180,42 +180,6 @@ var _ = Describe("ServiceExport controller", func() { Expect(*conditions[0].Message).Should(Equal("the Service does not exist")) }) - It("Should update existing ResourceExport when corresponding Endpoints has new Endpoint", func() { - By("By update an Endpoint with a new address") - latestEp := &corev1.Endpoints{} - Expect(k8sClient.Get(ctx, epNamespacedName, latestEp)).Should(Succeed()) - addresses := latestEp.Subsets[0].Addresses - addresses = append(addresses, addr3) - latestEp.Subsets[0].Addresses = addresses - Expect(k8sClient.Update(ctx, latestEp)).Should(Succeed()) - time.Sleep(2 * time.Second) - epResExport := &mcsv1alpha1.ResourceExport{} - expectedEpResExport.Spec.Endpoints = &mcsv1alpha1.EndpointsExport{ - Subsets: []corev1.EndpointSubset{ - { - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.17.11", - }, - { - IP: "192.168.17.13", - }, - }, - Ports: epPorts, - }, - }, - } - - var err error - Eventually(func() bool { - err = k8sClient.Get(ctx, types.NamespacedName{Namespace: LeaderNamespace, Name: epResExportName}, epResExport) - return err == nil - }, timeout, interval).Should(BeTrue()) - Expect(epResExport.ObjectMeta.Labels["sourceKind"]).Should(Equal("Endpoints")) - Expect(epResExport.Spec).Should(Equal(expectedEpResExport.Spec)) - - }) - It("Should delete existing ResourceExport when existing ServiceExport is deleted", func() { By("By remove a ServiceExport resource") err := k8sClient.Delete(ctx, svcExport)