Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data race in serviceexport_controller #4305

Merged
merged 1 commit into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions multicluster/controllers/multicluster/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&mcsv1alpha1.Gateway{}).
WithOptions(controller.Options{
// TODO: add a lock for serviceCIDR if there is any plan to
// increase this concurrent number.
// TODO: add a lock for r.serviceCIDR and r.localClusterID if
// there is any plan to increase this concurrent number.
MaxConcurrentReconciles: 1,
}).
Complete(r)
Expand Down
39 changes: 27 additions & 12 deletions multicluster/controllers/multicluster/serviceexport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package multicluster
import (
"context"
"reflect"
"sync"

corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -60,8 +61,10 @@ type (
// ServiceExportReconciler reconciles a ServiceExport object in the member cluster.
ServiceExportReconciler struct {
client.Client
mutex sync.Mutex
Scheme *runtime.Scheme
commonAreaGetter RemoteCommonAreaGetter
remoteCommonArea commonarea.RemoteCommonArea
installedSvcs cache.Indexer
installedEps cache.Indexer
leaderNamespace string
Expand Down Expand Up @@ -131,15 +134,9 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
klog.InfoS("Skip reconciling, no corresponding ServiceExport")
return ctrl.Result{}, nil
}
var commonArea commonarea.RemoteCommonArea
commonArea, r.localClusterID, err = r.commonAreaGetter.GetRemoteCommonAreaAndLocalID()
if commonArea == nil {
return ctrl.Result{Requeue: true}, err
if requeue := r.checkRemoteCommonArea(); requeue {
return ctrl.Result{Requeue: true}, nil
}

r.leaderNamespace = commonArea.GetNamespace()
r.leaderClusterID = string(commonArea.GetClusterID())

var svcExport k8smcsv1alpha1.ServiceExport
svcObj, svcInstalled, _ := r.installedSvcs.GetByKey(req.String())
epsObj, epsInstalled, _ := r.installedEps.GetByKey(req.String())
Expand All @@ -150,11 +147,11 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// When controller restarts, the Service is not in cache, but it is still possible
// we need to remove ResourceExports. So leave it to the caller to check the 'svcInstalled'
// before deletion or try to delete any way.
err = r.handleServiceDeleteEvent(ctx, req, commonArea)
err = r.handleServiceDeleteEvent(ctx, req, r.remoteCommonArea)
if err != nil {
return err
}
err = r.handleEndpointDeleteEvent(ctx, req, commonArea)
err = r.handleEndpointDeleteEvent(ctx, req, r.remoteCommonArea)
if err != nil {
return err
}
Expand Down Expand Up @@ -299,7 +296,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if !svcNoChange {
klog.InfoS("Service has new changes, update ResourceExport", "service", req.String(),
"resourceexport", svcExportNSName)
err := r.serviceHandler(ctx, req, svc, svcResExportName, re, commonArea)
err := r.serviceHandler(ctx, req, svc, svcResExportName, re, r.remoteCommonArea)
if err != nil {
klog.ErrorS(err, "Failed to handle Service change", "service", req.String())
return ctrl.Result{}, err
Expand All @@ -314,7 +311,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if !epNoChange {
klog.InfoS("Endpoints have new change, update ResourceExport", "endpoints",
req.String(), "resourceexport", epExportNSName)
err = r.endpointsHandler(ctx, req, eps, epResExportName, re, commonArea)
err = r.endpointsHandler(ctx, req, eps, epResExportName, re, r.remoteCommonArea)
if err != nil {
klog.ErrorS(err, "Failed to handle Endpoints change", "endpoints", req.String())
return ctrl.Result{}, err
Expand All @@ -324,6 +321,24 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, nil
}

// checkRemoteCommonArea initializes remoteCommonArea for the reconciler if necessary,
// or tells the Reconcile function to requeue if the remoteCommonArea is not ready.
func (r *ServiceExportReconciler) checkRemoteCommonArea() bool {
r.mutex.Lock()
defer r.mutex.Unlock()

if r.remoteCommonArea == nil {
commonArea, localClusterID, _ := r.commonAreaGetter.GetRemoteCommonAreaAndLocalID()
if commonArea == nil {
return true
}
r.leaderClusterID, r.localClusterID = string(commonArea.GetClusterID()), localClusterID
r.leaderNamespace = commonArea.GetNamespace()
r.remoteCommonArea = commonArea
}
return false
}

func (r *ServiceExportReconciler) handleServiceDeleteEvent(ctx context.Context, req ctrl.Request,
commonArea commonarea.RemoteCommonArea) error {
svcResExportName := getResourceExportName(r.localClusterID, req, "service")
Expand Down