From 2eacd209bfe482176918c5abf7cbdf06e8e101c7 Mon Sep 17 00:00:00 2001 From: Zhongcheng Lao Date: Tue, 1 Aug 2023 15:11:37 +0800 Subject: [PATCH] Using ClusterCacheTracker instead of remote.NewClusterClient --- controllers/controllers_suite_test.go | 20 ++++++-- controllers/serviceaccount_controller.go | 10 ++-- controllers/servicediscovery_controller.go | 10 ++-- controllers/vspherevm_controller.go | 12 +++-- main.go | 56 +++++++++++++++++++--- 5 files changed, 83 insertions(+), 25 deletions(-) diff --git a/controllers/controllers_suite_test.go b/controllers/controllers_suite_test.go index 7190ca789b..c1b1ea3fad 100644 --- a/controllers/controllers_suite_test.go +++ b/controllers/controllers_suite_test.go @@ -28,6 +28,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes/scheme" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/controllers/remote" ctrl "sigs.k8s.io/controller-runtime" infrav1 "sigs.k8s.io/cluster-api-provider-vsphere/apis/v1beta1" @@ -62,13 +63,26 @@ func setup() { testEnv = helpers.NewTestEnvironment() + tracker, err := remote.NewClusterCacheTracker( + testEnv.Manager, + remote.ClusterCacheTrackerOptions{ + SecretCachingClient: nil, + ControllerName: "", + Log: nil, + Indexes: []remote.Index{remote.NodeProviderIDIndex}, + }, + ) + if err != nil { + panic(fmt.Sprintf("unable to setup ClusterCacheTracker: %v", err)) + } + if err := AddClusterControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereCluster{}); err != nil { panic(fmt.Sprintf("unable to setup VsphereCluster controller: %v", err)) } if err := AddMachineControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereMachine{}); err != nil { panic(fmt.Sprintf("unable to setup VsphereMachine controller: %v", err)) } - if err := AddVMControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil { + if err := AddVMControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker); err != nil { panic(fmt.Sprintf("unable to setup VsphereVM controller: %v", err)) } if err := AddVsphereClusterIdentityControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil { @@ -77,10 +91,10 @@ func setup() { if err := AddVSphereDeploymentZoneControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil { panic(fmt.Sprintf("unable to setup VSphereDeploymentZone controller: %v", err)) } - if err := AddServiceAccountProviderControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil { + if err := AddServiceAccountProviderControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker); err != nil { panic(fmt.Sprintf("unable to setup ServiceAccount controller: %v", err)) } - if err := AddServiceDiscoveryControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil { + if err := AddServiceDiscoveryControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker); err != nil { panic(fmt.Sprintf("unable to setup SvcDiscovery controller: %v", err)) } diff --git a/controllers/serviceaccount_controller.go b/controllers/serviceaccount_controller.go index 739855e208..1dfbaa88d9 100644 --- a/controllers/serviceaccount_controller.go +++ b/controllers/serviceaccount_controller.go @@ -64,7 +64,7 @@ const ( ) // AddServiceAccountProviderControllerToManager adds this controller to the provided manager. -func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager) error { +func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker) error { var ( controlledType = &vmwarev1.ProviderServiceAccount{} controlledTypeName = reflect.TypeOf(controlledType).Elem().Name() @@ -80,8 +80,8 @@ func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManager Logger: ctx.Logger.WithName(controllerNameShort), } r := ServiceAccountReconciler{ - ControllerContext: controllerContext, - remoteClientGetter: remote.NewClusterClient, + ControllerContext: controllerContext, + RemoteClusterCacheTracker: tracker, } clusterToInfraFn := clusterToSupervisorInfrastructureMapFunc(ctx) @@ -130,7 +130,7 @@ func clusterToSupervisorInfrastructureMapFunc(managerContext *context.Controller type ServiceAccountReconciler struct { *context.ControllerContext - remoteClientGetter remote.ClusterClientGetter + RemoteClusterCacheTracker *remote.ClusterCacheTracker } func (r ServiceAccountReconciler) Reconcile(_ goctx.Context, req reconcile.Request) (_ reconcile.Result, reterr error) { @@ -198,7 +198,7 @@ func (r ServiceAccountReconciler) Reconcile(_ goctx.Context, req reconcile.Reque // then just return a no-op and wait for the next sync. This will occur when // the Cluster's status is updated with a reference to the secret that has // the Kubeconfig data used to access the target cluster. - guestClient, err := r.remoteClientGetter(clusterContext, ProviderServiceAccountControllerName, clusterContext.Client, client.ObjectKeyFromObject(cluster)) + guestClient, err := r.RemoteClusterCacheTracker.GetClient(clusterContext, client.ObjectKeyFromObject(cluster)) if err != nil { clusterContext.Logger.Info("The control plane is not ready yet", "err", err) return reconcile.Result{RequeueAfter: clusterNotReadyRequeueTime}, nil diff --git a/controllers/servicediscovery_controller.go b/controllers/servicediscovery_controller.go index b19447c931..706e4e07cf 100644 --- a/controllers/servicediscovery_controller.go +++ b/controllers/servicediscovery_controller.go @@ -70,7 +70,7 @@ const ( // +kubebuilder:rbac:groups="",resources=configmaps/status,verbs=get // AddServiceDiscoveryControllerToManager adds the ServiceDiscovery controller to the provided manager. -func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager) error { +func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker) error { var ( controllerNameShort = ServiceDiscoveryControllerName controllerNameLong = fmt.Sprintf("%s/%s/%s", ctx.Namespace, ctx.Name, ServiceDiscoveryControllerName) @@ -82,8 +82,8 @@ func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContex Logger: ctx.Logger.WithName(controllerNameShort), } r := serviceDiscoveryReconciler{ - ControllerContext: controllerContext, - remoteClientGetter: remote.NewClusterClient, + ControllerContext: controllerContext, + RemoteClusterCacheTracker: tracker, } configMapCache, err := cache.New(mgr.GetConfig(), cache.Options{ @@ -124,7 +124,7 @@ func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContex type serviceDiscoveryReconciler struct { *context.ControllerContext - remoteClientGetter remote.ClusterClientGetter + RemoteClusterCacheTracker *remote.ClusterCacheTracker } func (r serviceDiscoveryReconciler) Reconcile(_ goctx.Context, req reconcile.Request) (_ reconcile.Result, reterr error) { @@ -185,7 +185,7 @@ func (r serviceDiscoveryReconciler) Reconcile(_ goctx.Context, req reconcile.Req // We cannot proceed until we are able to access the target cluster. Until // then just return a no-op and wait for the next sync. - guestClient, err := r.remoteClientGetter(clusterContext, ServiceDiscoveryControllerName, clusterContext.Client, client.ObjectKeyFromObject(cluster)) + guestClient, err := r.RemoteClusterCacheTracker.GetClient(clusterContext, client.ObjectKeyFromObject(cluster)) if err != nil { logger.Info("The control plane is not ready yet", "err", err) return reconcile.Result{RequeueAfter: clusterNotReadyRequeueTime}, nil diff --git a/controllers/vspherevm_controller.go b/controllers/vspherevm_controller.go index 51ed1291cb..17416ce542 100644 --- a/controllers/vspherevm_controller.go +++ b/controllers/vspherevm_controller.go @@ -69,7 +69,7 @@ import ( // AddVMControllerToManager adds the VM controller to the provided manager. // //nolint:forcetypeassert -func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager) error { +func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker) error { var ( controlledType = &infrav1.VSphereVM{} controlledTypeName = reflect.TypeOf(controlledType).Elem().Name() @@ -87,8 +87,9 @@ func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager Logger: ctx.Logger.WithName(controllerNameShort), } r := vmReconciler{ - ControllerContext: controllerContext, - VMService: &govmomi.VMService{}, + ControllerContext: controllerContext, + VMService: &govmomi.VMService{}, + RemoteClusterCacheTracker: tracker, } controller, err := ctrl.NewControllerManagedBy(mgr). // Watch the controlled, infrastructure resource. @@ -156,7 +157,8 @@ func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager type vmReconciler struct { *context.ControllerContext - VMService services.VirtualMachineService + VMService services.VirtualMachineService + RemoteClusterCacheTracker *remote.ClusterCacheTracker } // Reconcile ensures the back-end state reflects the Kubernetes resource state intent. @@ -366,7 +368,7 @@ func (r vmReconciler) deleteNode(ctx *context.VMContext, name string) error { if err != nil { return err } - clusterClient, err := remote.NewClusterClient(ctx, r.ControllerContext.Name, r.Client, ctrlclient.ObjectKeyFromObject(cluster)) + clusterClient, err := r.RemoteClusterCacheTracker.GetClient(ctx, ctrlclient.ObjectKeyFromObject(cluster)) if err != nil { return err } diff --git a/main.go b/main.go index c24287b6ce..c3b6cae52f 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,8 @@ import ( "sigs.k8s.io/cluster-api/controllers/remote" "sigs.k8s.io/cluster-api/util/flags" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager" ctrlsig "sigs.k8s.io/controller-runtime/pkg/manager/signals" "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -222,6 +224,46 @@ func main() { // Create a function that adds all the controllers and webhooks to the manager. addToManager := func(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) error { + secretCachingClient, err := client.New(mgr.GetConfig(), client.Options{ + HTTPClient: mgr.GetHTTPClient(), + Cache: &client.CacheOptions{ + Reader: mgr.GetCache(), + }, + }) + if err != nil { + setupLog.Error(err, "unable to create secret caching client") + return err + } + + // Set up a ClusterCacheTracker and ClusterCacheReconciler to provide to controllers + // requiring a connection to a remote cluster + log := ctrl.Log.WithName("remote").WithName("ClusterCacheTracker") + tracker, err := remote.NewClusterCacheTracker( + mgr, + remote.ClusterCacheTrackerOptions{ + SecretCachingClient: secretCachingClient, + ControllerName: controllerName, + Log: &log, + Indexes: []remote.Index{remote.NodeProviderIDIndex}, + }, + ) + if err != nil { + setupLog.Error(err, "unable to create cluster cache tracker") + return err + } + + // TODO: waiting for other PR to add related flags first + if err := (&remote.ClusterCacheReconciler{ + Client: mgr.GetClient(), + Tracker: tracker, + WatchFilterValue: "", + }).SetupWithManager(ctx, mgr, controller.Options{ + MaxConcurrentReconciles: managerOpts.MaxConcurrentReconciles, + }); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler") + os.Exit(1) + } + // Check for non-supervisor VSphereCluster and start controller if found gvr := v1beta1.GroupVersion.WithResource(reflect.TypeOf(&v1beta1.VSphereCluster{}).Elem().Name()) isLoaded, err := isCRDDeployed(mgr, gvr) @@ -229,7 +271,7 @@ func main() { return err } if isLoaded { - if err := setupVAPIControllers(ctx, mgr); err != nil { + if err := setupVAPIControllers(ctx, mgr, tracker); err != nil { return fmt.Errorf("setupVAPIControllers: %w", err) } } else { @@ -243,7 +285,7 @@ func main() { return err } if isLoaded { - if err := setupSupervisorControllers(ctx, mgr); err != nil { + if err := setupSupervisorControllers(ctx, mgr, tracker); err != nil { return fmt.Errorf("setupSupervisorControllers: %w", err) } } else { @@ -290,7 +332,7 @@ func main() { defer session.Clear() } -func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) error { +func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager, tracker *remote.ClusterCacheTracker) error { if err := (&v1beta1.VSphereClusterTemplate{}).SetupWebhookWithManager(mgr); err != nil { return err } @@ -321,7 +363,7 @@ func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Man if err := controllers.AddMachineControllerToManager(ctx, mgr, &v1beta1.VSphereMachine{}); err != nil { return err } - if err := controllers.AddVMControllerToManager(ctx, mgr); err != nil { + if err := controllers.AddVMControllerToManager(ctx, mgr, tracker); err != nil { return err } if err := controllers.AddVsphereClusterIdentityControllerToManager(ctx, mgr); err != nil { @@ -331,7 +373,7 @@ func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Man return controllers.AddVSphereDeploymentZoneControllerToManager(ctx, mgr) } -func setupSupervisorControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) error { +func setupSupervisorControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager, tracker *remote.ClusterCacheTracker) error { if err := controllers.AddClusterControllerToManager(ctx, mgr, &vmwarev1b1.VSphereCluster{}); err != nil { return err } @@ -340,11 +382,11 @@ func setupSupervisorControllers(ctx *context.ControllerManagerContext, mgr ctrlm return err } - if err := controllers.AddServiceAccountProviderControllerToManager(ctx, mgr); err != nil { + if err := controllers.AddServiceAccountProviderControllerToManager(ctx, mgr, tracker); err != nil { return err } - return controllers.AddServiceDiscoveryControllerToManager(ctx, mgr) + return controllers.AddServiceDiscoveryControllerToManager(ctx, mgr, tracker) } func setupChecks(mgr ctrlmgr.Manager) {