From f31f0cea32d03b158b381ffb330c5b61b87cd82d Mon Sep 17 00:00:00 2001 From: Joel Speed Date: Tue, 2 Jun 2020 17:40:57 +0100 Subject: [PATCH] Integrate shared remote cluster watching into MHC --- controllers/machinehealthcheck_controller.go | 74 +++++-------------- .../machinehealthcheck_controller_test.go | 69 +++++++++++++++-- controllers/remote/cluster_cache.go | 37 +++------- .../remote/cluster_cache_reconciler_test.go | 13 ++-- controllers/suite_test.go | 16 ++++ main.go | 25 ++++++- 6 files changed, 138 insertions(+), 96 deletions(-) diff --git a/controllers/machinehealthcheck_controller.go b/controllers/machinehealthcheck_controller.go index f6de9b728abf..daae1ca00f0f 100644 --- a/controllers/machinehealthcheck_controller.go +++ b/controllers/machinehealthcheck_controller.go @@ -19,7 +19,6 @@ package controllers import ( "context" "fmt" - "sync" "time" "github.com/go-logr/logr" @@ -65,14 +64,13 @@ const ( // MachineHealthCheckReconciler reconciles a MachineHealthCheck object type MachineHealthCheckReconciler struct { - Client client.Client - Log logr.Logger - - controller controller.Controller - recorder record.EventRecorder - scheme *runtime.Scheme - clusterCaches map[client.ObjectKey]cache.Cache - clusterCachesLock sync.RWMutex + Client client.Client + Log logr.Logger + Tracker *remote.ClusterCacheTracker + + controller controller.Controller + recorder record.EventRecorder + scheme *runtime.Scheme } func (r *MachineHealthCheckReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { @@ -117,7 +115,6 @@ func (r *MachineHealthCheckReconciler) SetupWithManager(mgr ctrl.Manager, option r.controller = controller r.recorder = mgr.GetEventRecorderFor("machinehealthcheck-controller") r.scheme = mgr.GetScheme() - r.clusterCaches = make(map[client.ObjectKey]cache.Cache) return nil } @@ -203,7 +200,7 @@ func (r *MachineHealthCheckReconciler) reconcile(ctx context.Context, cluster *c return ctrl.Result{}, err } - if err := r.watchClusterNodes(ctx, r.Client, cluster); err != nil { + if err := r.watchClusterNodes(ctx, cluster); err != nil { logger.Error(err, "Error watching nodes on target cluster") return ctrl.Result{}, err } @@ -376,54 +373,21 @@ func (r *MachineHealthCheckReconciler) getMachineFromNode(nodeName string) (*clu return &machineList.Items[0], nil } -func (r *MachineHealthCheckReconciler) watchClusterNodes(ctx context.Context, c client.Client, cluster *clusterv1.Cluster) error { - key := util.ObjectKey(cluster) - if _, ok := r.getClusterCache(key); ok { - // watch was already set up for this cluster +func (r *MachineHealthCheckReconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.Cluster) error { + // If there is no tracker, don't watch remote nodes + if r.Tracker == nil { return nil } - return r.createClusterCache(ctx, c, key) -} - -func (r *MachineHealthCheckReconciler) getClusterCache(key client.ObjectKey) (cache.Cache, bool) { - r.clusterCachesLock.RLock() - defer r.clusterCachesLock.RUnlock() - - c, ok := r.clusterCaches[key] - return c, ok -} - -func (r *MachineHealthCheckReconciler) createClusterCache(ctx context.Context, c client.Client, key client.ObjectKey) error { - r.clusterCachesLock.Lock() - defer r.clusterCachesLock.Unlock() - - // Double check the key still doesn't exist under write lock - if _, ok := r.clusterCaches[key]; ok { - // An informer was created while waiting for the lock - return nil + if err := r.Tracker.Watch(ctx, remote.WatchInput{ + Cluster: util.ObjectKey(cluster), + Watcher: r.controller, + Kind: &corev1.Node{}, + CacheOptions: cache.Options{}, + EventHandler: &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.nodeToMachineHealthCheck)}, + }); err != nil { + return err } - - config, err := remote.RESTConfig(ctx, c, key) - if err != nil { - return errors.Wrap(err, "error fetching remote cluster config") - } - - clusterCache, err := cache.New(config, cache.Options{}) - if err != nil { - return errors.Wrap(err, "error creating cache for remote cluster") - } - go clusterCache.Start(ctx.Done()) - - err = r.controller.Watch( - source.NewKindWithCache(&corev1.Node{}, clusterCache), - &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.nodeToMachineHealthCheck)}, - ) - if err != nil { - return errors.Wrap(err, "error watching nodes on target cluster") - } - - r.clusterCaches[key] = clusterCache return nil } diff --git a/controllers/machinehealthcheck_controller_test.go b/controllers/machinehealthcheck_controller_test.go index dcda73fc8c8f..caa22b06f702 100644 --- a/controllers/machinehealthcheck_controller_test.go +++ b/controllers/machinehealthcheck_controller_test.go @@ -50,6 +50,7 @@ var _ = Describe("MachineHealthCheck Reconciler", func() { var clusterName = "test-cluster" var clusterKubeconfigName = "test-cluster-kubeconfig" + var clusterUID types.UID var namespaceName string BeforeEach(func() { @@ -63,6 +64,7 @@ var _ = Describe("MachineHealthCheck Reconciler", func() { By("Creating the Cluster") testCluster.Namespace = namespaceName Expect(testEnv.Create(ctx, testCluster)).To(Succeed()) + clusterUID = testCluster.UID By("Creating the remote Cluster kubeconfig") Expect(testEnv.CreateKubeconfigSecret(testCluster)).To(Succeed()) @@ -216,6 +218,17 @@ var _ = Describe("MachineHealthCheck Reconciler", func() { }, timeout).Should(Succeed()) } + // getMHCStatus is a function to be used in Eventually() matchers to check the MHC status + getMHCStatus := func(namespace, name string) func() clusterv1.MachineHealthCheckStatus { + return func() clusterv1.MachineHealthCheckStatus { + mhc := &clusterv1.MachineHealthCheck{} + if err := testEnv.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, mhc); err != nil { + return clusterv1.MachineHealthCheckStatus{} + } + return mhc.Status + } + } + type reconcileTestCase struct { mhc func() *clusterv1.MachineHealthCheck nodes func() []*corev1.Node @@ -316,13 +329,7 @@ var _ = Describe("MachineHealthCheck Reconciler", func() { Expect(testEnv.Create(ctx, mhc)).To(Succeed()) By("Verifying the status has been updated") - Eventually(func() clusterv1.MachineHealthCheckStatus { - mhc := &clusterv1.MachineHealthCheck{} - if err := testEnv.Get(ctx, types.NamespacedName{Namespace: namespaceName, Name: rtc.mhc().Name}, mhc); err != nil { - return clusterv1.MachineHealthCheckStatus{} - } - return mhc.Status - }, timeout).Should(Equal(rtc.expectedStatus)) + Eventually(getMHCStatus(namespaceName, rtc.mhc().Name), timeout).Should(Equal(rtc.expectedStatus)) // Status has been updated, a reconcile has occurred, should no longer need async assertions @@ -442,6 +449,54 @@ var _ = Describe("MachineHealthCheck Reconciler", func() { expectedStatus: clusterv1.MachineHealthCheckStatus{ExpectedMachines: 0, CurrentHealthy: 0}, }), ) + + Context("when a remote Node is modified", func() { + It("should react to the updated Node", func() { + By("Creating a Node") + remoteNode := newTestNode("remote-node-1") + remoteNode.Status.Conditions = []corev1.NodeCondition{healthyNodeCondition} + Expect(testEnv.Create(ctx, remoteNode)).To(Succeed()) + + By("Creating a Machine") + // Set up the Machine to reduce events triggered by other controllers updating the Machine + clusterOR := metav1.OwnerReference{APIVersion: clusterv1.GroupVersion.String(), Kind: "Cluster", Name: clusterName, UID: clusterUID} + remoteMachine := newTestMachine("remote-machine-1", namespaceName, clusterName, remoteNode.Name, labels) + remoteMachine.SetOwnerReferences([]metav1.OwnerReference{machineSetOR, clusterOR}) + now := metav1.NewTime(time.Now()) + remoteMachine.SetFinalizers([]string{"machine.cluster.x-k8s.io"}) + remoteMachine.Status.LastUpdated = &now + remoteMachine.Status.Phase = "Provisioned" + createMachine(remoteMachine) + + By("Creating a MachineHealthCheck") + mhc := newTestMachineHealthCheck("remote-test-mhc", namespaceName, clusterName, labels) + maxUnhealthy := intstr.Parse("1") + mhc.Spec.MaxUnhealthy = &maxUnhealthy + mhc.Default() + Expect(testEnv.Create(ctx, mhc)).To(Succeed()) + + By("Verifying the status has been updated, and the machine is currently healthy") + Eventually(getMHCStatus(namespaceName, mhc.Name), timeout).Should(Equal(clusterv1.MachineHealthCheckStatus{ExpectedMachines: 1, CurrentHealthy: 1})) + // Make sure the status is stable before making any changes, this allows in-flight reconciles to finish + Consistently(getMHCStatus(namespaceName, mhc.Name), 100*time.Millisecond).Should(Equal(clusterv1.MachineHealthCheckStatus{ExpectedMachines: 1, CurrentHealthy: 1})) + + By("Updating the node to make it unhealthy") + Eventually(func() error { + node := &corev1.Node{} + if err := testEnv.Get(ctx, util.ObjectKey(remoteNode), node); err != nil { + return err + } + node.Status.Conditions = []corev1.NodeCondition{unhealthyNodeCondition} + if err := testEnv.Status().Update(ctx, node); err != nil { + return err + } + return nil + }, timeout).Should(Succeed()) + + By("Verifying the status has been updated, and the machine is now unhealthy") + Eventually(getMHCStatus(namespaceName, mhc.Name), timeout).Should(Equal(clusterv1.MachineHealthCheckStatus{ExpectedMachines: 1, CurrentHealthy: 0})) + }) + }) }) }) diff --git a/controllers/remote/cluster_cache.go b/controllers/remote/cluster_cache.go index 7e82d4636421..d20308c393b8 100644 --- a/controllers/remote/cluster_cache.go +++ b/controllers/remote/cluster_cache.go @@ -366,34 +366,21 @@ func healthCheckPath(sourceCfg *rest.Config, requestTimeout time.Duration, path // ClusterCacheReconciler is responsible for stopping remote cluster caches when // the cluster for the remote cache is being deleted. type ClusterCacheReconciler struct { - log logr.Logger - client client.Client - tracker *ClusterCacheTracker + Log logr.Logger + Client client.Client + Tracker *ClusterCacheTracker } -func NewClusterCacheReconciler( - log logr.Logger, - mgr ctrl.Manager, - controllerOptions controller.Options, - cct *ClusterCacheTracker, -) (*ClusterCacheReconciler, error) { - r := &ClusterCacheReconciler{ - log: log, - client: mgr.GetClient(), - tracker: cct, - } - - // Watch Clusters so we can stop and remove caches when Clusters are deleted. +func (r *ClusterCacheReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { _, err := ctrl.NewControllerManagedBy(mgr). For(&clusterv1.Cluster{}). - WithOptions(controllerOptions). + WithOptions(options). Build(r) if err != nil { - return nil, errors.Wrap(err, "failed to create cluster cache manager controller") + return errors.Wrap(err, "failed setting up with a controller manager") } - - return r, nil + return nil } // Reconcile reconciles Clusters and removes ClusterCaches for any Cluster that cannot be retrieved from the @@ -401,12 +388,12 @@ func NewClusterCacheReconciler( func (r *ClusterCacheReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { ctx := context.Background() - log := r.log.WithValues("namespace", req.Namespace, "name", req.Name) + log := r.Log.WithValues("namespace", req.Namespace, "name", req.Name) log.V(4).Info("Reconciling") var cluster clusterv1.Cluster - err := r.client.Get(ctx, req.NamespacedName, &cluster) + err := r.Client.Get(ctx, req.NamespacedName, &cluster) if err == nil { log.V(4).Info("Cluster still exists") return reconcile.Result{}, nil @@ -417,7 +404,7 @@ func (r *ClusterCacheReconciler) Reconcile(req reconcile.Request) (reconcile.Res log.V(4).Info("Cluster no longer exists") - c := r.tracker.getClusterCache(req.NamespacedName) + c := r.Tracker.getClusterCache(req.NamespacedName) if c == nil { log.V(4).Info("No current cluster cache exists - nothing to do") return reconcile.Result{}, nil @@ -426,10 +413,10 @@ func (r *ClusterCacheReconciler) Reconcile(req reconcile.Request) (reconcile.Res log.V(4).Info("Stopping cluster cache") c.Stop() - r.tracker.deleteClusterCache(req.NamespacedName) + r.Tracker.deleteClusterCache(req.NamespacedName) log.V(4).Info("Deleting watches for cluster cache") - r.tracker.deleteWatchesForCluster(req.NamespacedName) + r.Tracker.deleteWatchesForCluster(req.NamespacedName) return reconcile.Result{}, nil } diff --git a/controllers/remote/cluster_cache_reconciler_test.go b/controllers/remote/cluster_cache_reconciler_test.go index 78aa33753db7..0ecdd1bcdc42 100644 --- a/controllers/remote/cluster_cache_reconciler_test.go +++ b/controllers/remote/cluster_cache_reconciler_test.go @@ -129,13 +129,12 @@ var _ = Describe("ClusterCache Reconciler suite", func() { Expect(k8sClient.Create(ctx, testNamespace)).To(Succeed()) By("Starting the ClusterCacheReconciler") - r, err := NewClusterCacheReconciler( - &log.NullLogger{}, - mgr, - controller.Options{}, - cct, - ) - Expect(err).ToNot(HaveOccurred()) + r := &ClusterCacheReconciler{ + Log: &log.NullLogger{}, + Client: mgr.GetClient(), + Tracker: cct, + } + Expect(r.SetupWithManager(mgr, controller.Options{})).To(Succeed()) By("Creating clusters to test with") clusterRequest1, clusterCache1 = createAndWatchCluster("cluster-1") diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 8f7a7c706db8..eb678b3e860a 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -30,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/envtest/printer" "sigs.k8s.io/cluster-api/cmd/clusterctl/log" + "sigs.k8s.io/cluster-api/controllers/remote" "sigs.k8s.io/cluster-api/test/helpers" // +kubebuilder:scaffold:imports ) @@ -66,6 +67,20 @@ var _ = BeforeSuite(func(done Done) { testEnv, err = helpers.NewTestEnvironment() Expect(err).NotTo(HaveOccurred()) + // Set up a ClusterCacheTracker and ClusterCacheReconciler to provide to controllers + // requiring a connection to a remote cluster + tracker, err := remote.NewClusterCacheTracker( + log.Log, + testEnv.Manager, + ) + Expect(err).ToNot(HaveOccurred()) + + Expect((&remote.ClusterCacheReconciler{ + Client: testEnv, + Log: log.Log, + Tracker: tracker, + }).SetupWithManager(testEnv.Manager, controller.Options{MaxConcurrentReconciles: 1})).To(Succeed()) + clusterReconciler = &ClusterReconciler{ Client: testEnv, Log: log.Log, @@ -90,6 +105,7 @@ var _ = BeforeSuite(func(done Done) { Expect((&MachineHealthCheckReconciler{ Client: testEnv, Log: log.Log, + Tracker: tracker, recorder: testEnv.GetEventRecorderFor("machinehealthcheck-controller"), }).SetupWithManager(testEnv.Manager, controller.Options{MaxConcurrentReconciles: 1})).To(Succeed()) diff --git a/main.go b/main.go index d6818c4964d7..a21a13c11ad3 100644 --- a/main.go +++ b/main.go @@ -33,6 +33,7 @@ import ( clusterv1alpha3 "sigs.k8s.io/cluster-api/api/v1alpha3" "sigs.k8s.io/cluster-api/cmd/version" "sigs.k8s.io/cluster-api/controllers" + "sigs.k8s.io/cluster-api/controllers/remote" expv1alpha3 "sigs.k8s.io/cluster-api/exp/api/v1alpha3" expcontrollers "sigs.k8s.io/cluster-api/exp/controllers" "sigs.k8s.io/cluster-api/feature" @@ -194,6 +195,25 @@ func setupReconcilers(mgr ctrl.Manager) { return } + // Set up a ClusterCacheTracker and ClusterCacheReconciler to provide to controllers + // requiring a connection to a remote cluster + tracker, err := remote.NewClusterCacheTracker( + ctrl.Log.WithName("remote").WithName("ClusterCacheTracker"), + mgr, + ) + if err != nil { + setupLog.Error(err, "unable to create cluster cache tracker") + os.Exit(1) + } + if err := (&remote.ClusterCacheReconciler{ + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("remote").WithName("ClusterCacheReconciler"), + Tracker: tracker, + }).SetupWithManager(mgr, concurrency(clusterConcurrency)); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler") + os.Exit(1) + } + if err := (&controllers.ClusterReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("Cluster"), @@ -233,8 +253,9 @@ func setupReconcilers(mgr ctrl.Manager) { } } if err := (&controllers.MachineHealthCheckReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("MachineHealthCheck"), + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("MachineHealthCheck"), + Tracker: tracker, }).SetupWithManager(mgr, concurrency(machineHealthCheckConcurrency)); err != nil { setupLog.Error(err, "unable to create controller", "controller", "MachineHealthCheck") os.Exit(1)