Skip to content

Commit

Permalink
Merge d35e828 into 6a26bf7
Browse files Browse the repository at this point in the history
  • Loading branch information
Dyanngg authored Oct 17, 2022
2 parents 6a26bf7 + d35e828 commit 233fcc3
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
4 changes: 2 additions & 2 deletions multicluster/controllers/multicluster/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&mcsv1alpha1.Gateway{}).
WithOptions(controller.Options{
// TODO: add a lock for serviceCIDR if there is any plan to
// increase this concurrent number.
// TODO: add a lock for r.serviceCIDR and r.localClusterID if
// there is any plan to increase this concurrent number.
MaxConcurrentReconciles: 1,
}).
Complete(r)
Expand Down
39 changes: 27 additions & 12 deletions multicluster/controllers/multicluster/serviceexport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package multicluster
import (
"context"
"reflect"
"sync"

corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -60,8 +61,10 @@ type (
// ServiceExportReconciler reconciles a ServiceExport object in the member cluster.
ServiceExportReconciler struct {
client.Client
mutex sync.Mutex
Scheme *runtime.Scheme
commonAreaGetter RemoteCommonAreaGetter
remoteCommonArea commonarea.RemoteCommonArea
installedSvcs cache.Indexer
installedEps cache.Indexer
leaderNamespace string
Expand Down Expand Up @@ -131,15 +134,9 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
klog.InfoS("Skip reconciling, no corresponding ServiceExport")
return ctrl.Result{}, nil
}
var commonArea commonarea.RemoteCommonArea
commonArea, r.localClusterID, err = r.commonAreaGetter.GetRemoteCommonAreaAndLocalID()
if commonArea == nil {
return ctrl.Result{Requeue: true}, err
if requeue := r.checkRemoteCommonArea(); requeue {
return ctrl.Result{Requeue: true}, nil
}

r.leaderNamespace = commonArea.GetNamespace()
r.leaderClusterID = string(commonArea.GetClusterID())

var svcExport k8smcsv1alpha1.ServiceExport
svcObj, svcInstalled, _ := r.installedSvcs.GetByKey(req.String())
epsObj, epsInstalled, _ := r.installedEps.GetByKey(req.String())
Expand All @@ -150,11 +147,11 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// When controller restarts, the Service is not in cache, but it is still possible
// we need to remove ResourceExports. So leave it to the caller to check the 'svcInstalled'
// before deletion or try to delete any way.
err = r.handleServiceDeleteEvent(ctx, req, commonArea)
err = r.handleServiceDeleteEvent(ctx, req, r.remoteCommonArea)
if err != nil {
return err
}
err = r.handleEndpointDeleteEvent(ctx, req, commonArea)
err = r.handleEndpointDeleteEvent(ctx, req, r.remoteCommonArea)
if err != nil {
return err
}
Expand Down Expand Up @@ -299,7 +296,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if !svcNoChange {
klog.InfoS("Service has new changes, update ResourceExport", "service", req.String(),
"resourceexport", svcExportNSName)
err := r.serviceHandler(ctx, req, svc, svcResExportName, re, commonArea)
err := r.serviceHandler(ctx, req, svc, svcResExportName, re, r.remoteCommonArea)
if err != nil {
klog.ErrorS(err, "Failed to handle Service change", "service", req.String())
return ctrl.Result{}, err
Expand All @@ -314,7 +311,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if !epNoChange {
klog.InfoS("Endpoints have new change, update ResourceExport", "endpoints",
req.String(), "resourceexport", epExportNSName)
err = r.endpointsHandler(ctx, req, eps, epResExportName, re, commonArea)
err = r.endpointsHandler(ctx, req, eps, epResExportName, re, r.remoteCommonArea)
if err != nil {
klog.ErrorS(err, "Failed to handle Endpoints change", "endpoints", req.String())
return ctrl.Result{}, err
Expand All @@ -324,6 +321,24 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, nil
}

// checkRemoteCommonArea initializes remoteCommonArea for the reconciler if necessary,
// or tells the Reconcile function to requeue if the remoteCommonArea is not ready.
func (r *ServiceExportReconciler) checkRemoteCommonArea() bool {
r.mutex.Lock()
defer r.mutex.Unlock()

if r.remoteCommonArea == nil {
commonArea, localClusterID, _ := r.commonAreaGetter.GetRemoteCommonAreaAndLocalID()
if commonArea == nil {
return true
}
r.leaderClusterID, r.localClusterID = string(commonArea.GetClusterID()), localClusterID
r.leaderNamespace = commonArea.GetNamespace()
r.remoteCommonArea = commonArea
}
return false
}

func (r *ServiceExportReconciler) handleServiceDeleteEvent(ctx context.Context, req ctrl.Request,
commonArea commonarea.RemoteCommonArea) error {
svcResExportName := getResourceExportName(r.localClusterID, req, "service")
Expand Down

0 comments on commit 233fcc3

Please sign in to comment.