Skip to content

Commit

Permalink
Use Service ClusterIPs as MC Service endpoint
Browse files Browse the repository at this point in the history
Use Service ClusterIPs instead of Pod IP as MC Service Endpoint.
The ServiceExport controller will watch only watch ServiceExport and
Service events, wrap Services' ClusterIPs into a new Endpoint kind of
ResourceExport.

Signed-off-by: Lan Luo <[email protected]>
  • Loading branch information
luolanzone committed Apr 26, 2022
1 parent 9cadc45 commit f7ac8fd
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 107 deletions.
103 changes: 48 additions & 55 deletions multicluster/controllers/multicluster/serviceexport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package multicluster
import (
"context"
"reflect"
"sort"

corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -75,7 +74,6 @@ type (
const (
// cached indexer
svcIndexerByType = "svc.type"
epIndexerByLabel = "ep.label"
)

type reason int
Expand All @@ -96,9 +94,7 @@ func NewServiceExportReconciler(
installedSvcs: cache.NewIndexer(svcInfoKeyFunc, cache.Indexers{
svcIndexerByType: svcIndexerByTypeFunc,
}),
installedEps: cache.NewIndexer(epInfoKeyFunc, cache.Indexers{
epIndexerByLabel: epIndexerByLabelFunc,
}),
installedEps: cache.NewIndexer(epInfoKeyFunc, cache.Indexers{}),
}
return reconciler
}
Expand All @@ -117,34 +113,17 @@ func epInfoKeyFunc(obj interface{}) (string, error) {
return common.NamespacedName(ep.namespace, ep.name), nil
}

func epIndexerByLabelFunc(obj interface{}) ([]string, error) {
var info []string
ep := obj.(*epInfo)
keys := make([]string, len(ep.labels))
i := 0
for k := range ep.labels {
keys[i] = k
i++
}
sort.Strings(keys)
for _, k := range keys {
info = append(info, k+ep.labels[k])
}
return info, nil
}

//+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=resourceexports,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=resourceexports/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=resourceexports/finalizers,verbs=update
//+kubebuilder:rbac:groups=multicluster.x-k8s.io,resources=serviceexports,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=multicluster.x-k8s.io,resources=serviceexports/status,verbs=get;update;patch
//+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;update
//+kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch;update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// For ServiceExport Reconcile, it watches events of ServiceExport resources,
// and also Endpoints/Services resource. It will create/update/remove ResourceExport
// and also Services resource. It will create/update/remove ResourceExport
// in a leader cluster for corresponding ServiceExport from a member cluster.
func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.V(2).InfoS("Reconciling ServiceExport", "serviceexport", req.NamespacedName)
Expand Down Expand Up @@ -185,27 +164,34 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
r.leaderNamespace = remoteCluster.GetNamespace()
r.leaderClusterID = string(remoteCluster.GetClusterID())

svc := &corev1.Service{}
if err := r.Client.Get(ctx, req.NamespacedName, &svcExport); err != nil {
klog.V(2).ErrorS(err, "Unable to fetch ServiceExport", "serviceexport", req.String())
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}
cleanup := func() error {
if svcInstalled {
err = r.handleServiceDeleteEvent(ctx, req, remoteCluster)
if err != nil {
return ctrl.Result{}, err
return err
}
r.installedSvcs.Delete(svcObj)
}

if epInstalled {
err = r.handleEndpointDeleteEvent(ctx, req, remoteCluster)
if err != nil {
return ctrl.Result{}, err
return err
}
r.installedEps.Delete(epObj)
}
return nil
}

svc := &corev1.Service{}
if err := r.Client.Get(ctx, req.NamespacedName, &svcExport); err != nil {
klog.V(2).ErrorS(err, "Unable to fetch ServiceExport", "serviceexport", req.String())
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}
if err := cleanup(); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

Expand All @@ -214,12 +200,8 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
err = r.Client.Get(ctx, req.NamespacedName, svc)
if err != nil {
if apierrors.IsNotFound(err) {
if svcInstalled {
err = r.handleServiceDeleteEvent(ctx, req, remoteCluster)
if err != nil {
return ctrl.Result{}, err
}
r.installedSvcs.Delete(svcObj)
if err := cleanup(); err != nil {
return ctrl.Result{}, err
}
err = r.updateSvcExportStatus(ctx, req, notFound)
if err != nil {
Expand All @@ -244,8 +226,8 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}
}

// We also watch Service and Endpoints events via events mapping function.
// Need to check cache and compare with cache if there is any change for Service or Endpoints.
// We also watch Service events via events mapping function.
// Need to check cache and compare with cache if there is any change for Service.
var svcNoChange, epNoChange bool
svcExportNSName := common.NamespacedName(r.leaderNamespace, svcResExportName)
epExportNSName := common.NamespacedName(r.leaderNamespace, epResExportName)
Expand All @@ -265,20 +247,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
Namespace: req.Namespace,
},
}

err = r.Client.Get(ctx, req.NamespacedName, ep)
if err != nil {
klog.ErrorS(err, "Failed to get Endpoints", "endpoints", req.String())
if apierrors.IsNotFound(err) && epInstalled {
err = r.handleEndpointDeleteEvent(ctx, req, remoteCluster)
if err != nil {
return ctrl.Result{}, err
}
r.installedEps.Delete(epObj)
}
return ctrl.Result{}, client.IgnoreNotFound(err)
}

ep.Subsets = []corev1.EndpointSubset{getSubsets(svc)}
if epInstalled {
installedEp := epObj.(*epInfo)
if apiequality.Semantic.DeepEqual(getEndPointsPorts(ep), installedEp.ports) &&
Expand Down Expand Up @@ -447,7 +416,6 @@ func (r *ServiceExportReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&k8smcsv1alpha1.ServiceExport{}).
Watches(&source.Kind{Type: &corev1.Service{}}, handler.EnqueueRequestsFromMapFunc(objMapFunc)).
Watches(&source.Kind{Type: &corev1.Endpoints{}}, handler.EnqueueRequestsFromMapFunc(objMapFunc)).
WithOptions(controller.Options{
MaxConcurrentReconciles: common.DefaultWorkerCount,
}).
Expand Down Expand Up @@ -515,7 +483,6 @@ func (r *ServiceExportReconciler) endpointsHandler(
namespace: ep.Namespace,
addressIPs: getEndPointsAddress(ep),
ports: getEndPointsPorts(ep),
labels: ep.Labels,
}
r.refreshResourceExport(resName, kind, nil, ep, &re)
existResExport := &mcsv1alpha1.ResourceExport{}
Expand Down Expand Up @@ -619,3 +586,29 @@ func getResourceExportName(clusterID string, req ctrl.Request, kind string) stri
func getStringPointer(str string) *string {
return &str
}

func getSubsets(svc *corev1.Service) corev1.EndpointSubset {
var epSubset corev1.EndpointSubset
for _, ip := range svc.Spec.ClusterIPs {
epSubset.Addresses = append(epSubset.Addresses, corev1.EndpointAddress{IP: ip})
}

epSubset.Ports = portsConverter(svc.Spec.Ports)
return epSubset
}

// portsConverter will convert Service's port to EndpointPort
func portsConverter(ports []corev1.ServicePort) []corev1.EndpointPort {
if len(ports) == 0 {
return nil
}
var epPorts []corev1.EndpointPort
for _, p := range ports {
epPorts = append(epPorts, corev1.EndpointPort{
Name: p.Name,
Port: p.Port,
Protocol: p.Protocol,
})
}
return epPorts
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,18 +222,9 @@ func TestServiceExportReconciler_handleServiceUpdateEvent(t *testing.T) {
ports: svcNginx.Spec.Ports,
svcType: string(svcNginx.Spec.Type),
}
epInfo := &epInfo{
name: epNginx.Name,
namespace: epNginx.Namespace,
addressIPs: getEndPointsAddress(epNginx),
ports: getEndPointsPorts(epNginx),
labels: epNginx.Labels,
}

newSvcNginx := svcNginx.DeepCopy()
newSvcNginx.Spec.Ports = []corev1.ServicePort{svcPort8080}
newEpNginx := epNginx.DeepCopy()
newEpNginx.Subsets[0].Ports = epPorts8080

re := mcsv1alpha1.ResourceExport{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -259,13 +250,12 @@ func TestServiceExportReconciler_handleServiceUpdateEvent(t *testing.T) {
existEpRe.Name = "cluster-a-default-nginx-endpoints"
existEpRe.Spec.Endpoints = &mcsv1alpha1.EndpointsExport{Subsets: epNginxSubset}

fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(newSvcNginx, newEpNginx, existSvcExport).Build()
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(newSvcNginx, existSvcExport).Build()
fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existSvcRe, existEpRe).Build()

_ = commonarea.NewFakeRemoteCommonArea(scheme, &remoteMgr, fakeRemoteClient, "leader-cluster", "default")
r := NewServiceExportReconciler(fakeClient, scheme, &remoteMgr)
r.installedSvcs.Add(sinfo)
r.installedEps.Add(epInfo)
if _, err := r.Reconcile(ctx, nginxReq); err != nil {
t.Errorf("ServiceExport Reconciler should update ResourceExports but got error = %v", err)
} else {
Expand Down Expand Up @@ -296,7 +286,7 @@ func TestServiceExportReconciler_handleServiceUpdateEvent(t *testing.T) {
{
Addresses: []corev1.EndpointAddress{
{
IP: "192.168.17.11",
IP: "192.168.2.3",
},
},
Ports: epPorts8080,
Expand Down
42 changes: 3 additions & 39 deletions multicluster/test/integration/serviceexport_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import (

var _ = Describe("ServiceExport controller", func() {
svcSpec := corev1.ServiceSpec{
Ports: svcPorts,
ClusterIP: "10.96.11.10",
Ports: svcPorts,
}

svc := &corev1.Service{
Expand All @@ -53,7 +54,6 @@ var _ = Describe("ServiceExport controller", func() {
Namespace: svc.Namespace,
Name: svc.Name,
}
epNamespacedName := svcNamespacedName

ep := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -108,7 +108,7 @@ var _ = Describe("ServiceExport controller", func() {
{
Addresses: []corev1.EndpointAddress{
{
IP: "192.168.17.11",
IP: "10.96.11.10",
},
},
Ports: epPorts,
Expand Down Expand Up @@ -180,42 +180,6 @@ var _ = Describe("ServiceExport controller", func() {
Expect(*conditions[0].Message).Should(Equal("the Service does not exist"))
})

It("Should update existing ResourceExport when corresponding Endpoints has new Endpoint", func() {
By("By update an Endpoint with a new address")
latestEp := &corev1.Endpoints{}
Expect(k8sClient.Get(ctx, epNamespacedName, latestEp)).Should(Succeed())
addresses := latestEp.Subsets[0].Addresses
addresses = append(addresses, addr3)
latestEp.Subsets[0].Addresses = addresses
Expect(k8sClient.Update(ctx, latestEp)).Should(Succeed())
time.Sleep(2 * time.Second)
epResExport := &mcsv1alpha1.ResourceExport{}
expectedEpResExport.Spec.Endpoints = &mcsv1alpha1.EndpointsExport{
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "192.168.17.11",
},
{
IP: "192.168.17.13",
},
},
Ports: epPorts,
},
},
}

var err error
Eventually(func() bool {
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: LeaderNamespace, Name: epResExportName}, epResExport)
return err == nil
}, timeout, interval).Should(BeTrue())
Expect(epResExport.ObjectMeta.Labels["sourceKind"]).Should(Equal("Endpoints"))
Expect(epResExport.Spec).Should(Equal(expectedEpResExport.Spec))

})

It("Should delete existing ResourceExport when existing ServiceExport is deleted", func() {
By("By remove a ServiceExport resource")
err := k8sClient.Delete(ctx, svcExport)
Expand Down
2 changes: 1 addition & 1 deletion multicluster/test/integration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ var _ = BeforeSuite(func() {
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases"),
filepath.Join("..", "..", "config", "crd", "k8smcs"),
filepath.Join("..", "..", "..", "build", "yamls", "base", "crds.yml")},
filepath.Join("..", "..", "..", "build", "charts", "antrea", "templates", "crds", "clusternetworkpolicy.yaml")},
ErrorIfCRDPathMissing: true,
UseExistingCluster: &useExistingCluster,
}
Expand Down

0 comments on commit f7ac8fd

Please sign in to comment.