Skip to content

Commit

Permalink
Merge pull request #3129 from JoelSpeed/mhc-integrate-remote-cache
Browse files Browse the repository at this point in the history
🌱 Integrate shared remote cluster watching into MHC
  • Loading branch information
k8s-ci-robot authored Jun 5, 2020
2 parents 35fdce2 + f31f0ce commit bfb2a3b
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 96 deletions.
74 changes: 19 additions & 55 deletions controllers/machinehealthcheck_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package controllers
import (
"context"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -65,14 +64,13 @@ const (

// MachineHealthCheckReconciler reconciles a MachineHealthCheck object
type MachineHealthCheckReconciler struct {
Client client.Client
Log logr.Logger

controller controller.Controller
recorder record.EventRecorder
scheme *runtime.Scheme
clusterCaches map[client.ObjectKey]cache.Cache
clusterCachesLock sync.RWMutex
Client client.Client
Log logr.Logger
Tracker *remote.ClusterCacheTracker

controller controller.Controller
recorder record.EventRecorder
scheme *runtime.Scheme
}

func (r *MachineHealthCheckReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
Expand Down Expand Up @@ -117,7 +115,6 @@ func (r *MachineHealthCheckReconciler) SetupWithManager(mgr ctrl.Manager, option
r.controller = controller
r.recorder = mgr.GetEventRecorderFor("machinehealthcheck-controller")
r.scheme = mgr.GetScheme()
r.clusterCaches = make(map[client.ObjectKey]cache.Cache)
return nil
}

Expand Down Expand Up @@ -203,7 +200,7 @@ func (r *MachineHealthCheckReconciler) reconcile(ctx context.Context, cluster *c
return ctrl.Result{}, err
}

if err := r.watchClusterNodes(ctx, r.Client, cluster); err != nil {
if err := r.watchClusterNodes(ctx, cluster); err != nil {
logger.Error(err, "Error watching nodes on target cluster")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -376,54 +373,21 @@ func (r *MachineHealthCheckReconciler) getMachineFromNode(nodeName string) (*clu
return &machineList.Items[0], nil
}

func (r *MachineHealthCheckReconciler) watchClusterNodes(ctx context.Context, c client.Client, cluster *clusterv1.Cluster) error {
key := util.ObjectKey(cluster)
if _, ok := r.getClusterCache(key); ok {
// watch was already set up for this cluster
func (r *MachineHealthCheckReconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.Cluster) error {
// If there is no tracker, don't watch remote nodes
if r.Tracker == nil {
return nil
}

return r.createClusterCache(ctx, c, key)
}

func (r *MachineHealthCheckReconciler) getClusterCache(key client.ObjectKey) (cache.Cache, bool) {
r.clusterCachesLock.RLock()
defer r.clusterCachesLock.RUnlock()

c, ok := r.clusterCaches[key]
return c, ok
}

func (r *MachineHealthCheckReconciler) createClusterCache(ctx context.Context, c client.Client, key client.ObjectKey) error {
r.clusterCachesLock.Lock()
defer r.clusterCachesLock.Unlock()

// Double check the key still doesn't exist under write lock
if _, ok := r.clusterCaches[key]; ok {
// An informer was created while waiting for the lock
return nil
if err := r.Tracker.Watch(ctx, remote.WatchInput{
Cluster: util.ObjectKey(cluster),
Watcher: r.controller,
Kind: &corev1.Node{},
CacheOptions: cache.Options{},
EventHandler: &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.nodeToMachineHealthCheck)},
}); err != nil {
return err
}

config, err := remote.RESTConfig(ctx, c, key)
if err != nil {
return errors.Wrap(err, "error fetching remote cluster config")
}

clusterCache, err := cache.New(config, cache.Options{})
if err != nil {
return errors.Wrap(err, "error creating cache for remote cluster")
}
go clusterCache.Start(ctx.Done())

err = r.controller.Watch(
source.NewKindWithCache(&corev1.Node{}, clusterCache),
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.nodeToMachineHealthCheck)},
)
if err != nil {
return errors.Wrap(err, "error watching nodes on target cluster")
}

r.clusterCaches[key] = clusterCache
return nil
}

Expand Down
69 changes: 62 additions & 7 deletions controllers/machinehealthcheck_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var _ = Describe("MachineHealthCheck Reconciler", func() {

var clusterName = "test-cluster"
var clusterKubeconfigName = "test-cluster-kubeconfig"
var clusterUID types.UID
var namespaceName string

BeforeEach(func() {
Expand All @@ -63,6 +64,7 @@ var _ = Describe("MachineHealthCheck Reconciler", func() {
By("Creating the Cluster")
testCluster.Namespace = namespaceName
Expect(testEnv.Create(ctx, testCluster)).To(Succeed())
clusterUID = testCluster.UID

By("Creating the remote Cluster kubeconfig")
Expect(testEnv.CreateKubeconfigSecret(testCluster)).To(Succeed())
Expand Down Expand Up @@ -216,6 +218,17 @@ var _ = Describe("MachineHealthCheck Reconciler", func() {
}, timeout).Should(Succeed())
}

// getMHCStatus is a function to be used in Eventually() matchers to check the MHC status
getMHCStatus := func(namespace, name string) func() clusterv1.MachineHealthCheckStatus {
return func() clusterv1.MachineHealthCheckStatus {
mhc := &clusterv1.MachineHealthCheck{}
if err := testEnv.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, mhc); err != nil {
return clusterv1.MachineHealthCheckStatus{}
}
return mhc.Status
}
}

type reconcileTestCase struct {
mhc func() *clusterv1.MachineHealthCheck
nodes func() []*corev1.Node
Expand Down Expand Up @@ -316,13 +329,7 @@ var _ = Describe("MachineHealthCheck Reconciler", func() {
Expect(testEnv.Create(ctx, mhc)).To(Succeed())

By("Verifying the status has been updated")
Eventually(func() clusterv1.MachineHealthCheckStatus {
mhc := &clusterv1.MachineHealthCheck{}
if err := testEnv.Get(ctx, types.NamespacedName{Namespace: namespaceName, Name: rtc.mhc().Name}, mhc); err != nil {
return clusterv1.MachineHealthCheckStatus{}
}
return mhc.Status
}, timeout).Should(Equal(rtc.expectedStatus))
Eventually(getMHCStatus(namespaceName, rtc.mhc().Name), timeout).Should(Equal(rtc.expectedStatus))

// Status has been updated, a reconcile has occurred, should no longer need async assertions

Expand Down Expand Up @@ -442,6 +449,54 @@ var _ = Describe("MachineHealthCheck Reconciler", func() {
expectedStatus: clusterv1.MachineHealthCheckStatus{ExpectedMachines: 0, CurrentHealthy: 0},
}),
)

Context("when a remote Node is modified", func() {
It("should react to the updated Node", func() {
By("Creating a Node")
remoteNode := newTestNode("remote-node-1")
remoteNode.Status.Conditions = []corev1.NodeCondition{healthyNodeCondition}
Expect(testEnv.Create(ctx, remoteNode)).To(Succeed())

By("Creating a Machine")
// Set up the Machine to reduce events triggered by other controllers updating the Machine
clusterOR := metav1.OwnerReference{APIVersion: clusterv1.GroupVersion.String(), Kind: "Cluster", Name: clusterName, UID: clusterUID}
remoteMachine := newTestMachine("remote-machine-1", namespaceName, clusterName, remoteNode.Name, labels)
remoteMachine.SetOwnerReferences([]metav1.OwnerReference{machineSetOR, clusterOR})
now := metav1.NewTime(time.Now())
remoteMachine.SetFinalizers([]string{"machine.cluster.x-k8s.io"})
remoteMachine.Status.LastUpdated = &now
remoteMachine.Status.Phase = "Provisioned"
createMachine(remoteMachine)

By("Creating a MachineHealthCheck")
mhc := newTestMachineHealthCheck("remote-test-mhc", namespaceName, clusterName, labels)
maxUnhealthy := intstr.Parse("1")
mhc.Spec.MaxUnhealthy = &maxUnhealthy
mhc.Default()
Expect(testEnv.Create(ctx, mhc)).To(Succeed())

By("Verifying the status has been updated, and the machine is currently healthy")
Eventually(getMHCStatus(namespaceName, mhc.Name), timeout).Should(Equal(clusterv1.MachineHealthCheckStatus{ExpectedMachines: 1, CurrentHealthy: 1}))
// Make sure the status is stable before making any changes, this allows in-flight reconciles to finish
Consistently(getMHCStatus(namespaceName, mhc.Name), 100*time.Millisecond).Should(Equal(clusterv1.MachineHealthCheckStatus{ExpectedMachines: 1, CurrentHealthy: 1}))

By("Updating the node to make it unhealthy")
Eventually(func() error {
node := &corev1.Node{}
if err := testEnv.Get(ctx, util.ObjectKey(remoteNode), node); err != nil {
return err
}
node.Status.Conditions = []corev1.NodeCondition{unhealthyNodeCondition}
if err := testEnv.Status().Update(ctx, node); err != nil {
return err
}
return nil
}, timeout).Should(Succeed())

By("Verifying the status has been updated, and the machine is now unhealthy")
Eventually(getMHCStatus(namespaceName, mhc.Name), timeout).Should(Equal(clusterv1.MachineHealthCheckStatus{ExpectedMachines: 1, CurrentHealthy: 0}))
})
})
})
})

Expand Down
37 changes: 12 additions & 25 deletions controllers/remote/cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,47 +366,34 @@ func healthCheckPath(sourceCfg *rest.Config, requestTimeout time.Duration, path
// ClusterCacheReconciler is responsible for stopping remote cluster caches when
// the cluster for the remote cache is being deleted.
type ClusterCacheReconciler struct {
log logr.Logger
client client.Client
tracker *ClusterCacheTracker
Log logr.Logger
Client client.Client
Tracker *ClusterCacheTracker
}

func NewClusterCacheReconciler(
log logr.Logger,
mgr ctrl.Manager,
controllerOptions controller.Options,
cct *ClusterCacheTracker,
) (*ClusterCacheReconciler, error) {
r := &ClusterCacheReconciler{
log: log,
client: mgr.GetClient(),
tracker: cct,
}

// Watch Clusters so we can stop and remove caches when Clusters are deleted.
func (r *ClusterCacheReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
_, err := ctrl.NewControllerManagedBy(mgr).
For(&clusterv1.Cluster{}).
WithOptions(controllerOptions).
WithOptions(options).
Build(r)

if err != nil {
return nil, errors.Wrap(err, "failed to create cluster cache manager controller")
return errors.Wrap(err, "failed setting up with a controller manager")
}

return r, nil
return nil
}

// Reconcile reconciles Clusters and removes ClusterCaches for any Cluster that cannot be retrieved from the
// management cluster.
func (r *ClusterCacheReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
ctx := context.Background()

log := r.log.WithValues("namespace", req.Namespace, "name", req.Name)
log := r.Log.WithValues("namespace", req.Namespace, "name", req.Name)
log.V(4).Info("Reconciling")

var cluster clusterv1.Cluster

err := r.client.Get(ctx, req.NamespacedName, &cluster)
err := r.Client.Get(ctx, req.NamespacedName, &cluster)
if err == nil {
log.V(4).Info("Cluster still exists")
return reconcile.Result{}, nil
Expand All @@ -417,7 +404,7 @@ func (r *ClusterCacheReconciler) Reconcile(req reconcile.Request) (reconcile.Res

log.V(4).Info("Cluster no longer exists")

c := r.tracker.getClusterCache(req.NamespacedName)
c := r.Tracker.getClusterCache(req.NamespacedName)
if c == nil {
log.V(4).Info("No current cluster cache exists - nothing to do")
return reconcile.Result{}, nil
Expand All @@ -426,10 +413,10 @@ func (r *ClusterCacheReconciler) Reconcile(req reconcile.Request) (reconcile.Res
log.V(4).Info("Stopping cluster cache")
c.Stop()

r.tracker.deleteClusterCache(req.NamespacedName)
r.Tracker.deleteClusterCache(req.NamespacedName)

log.V(4).Info("Deleting watches for cluster cache")
r.tracker.deleteWatchesForCluster(req.NamespacedName)
r.Tracker.deleteWatchesForCluster(req.NamespacedName)

return reconcile.Result{}, nil
}
13 changes: 6 additions & 7 deletions controllers/remote/cluster_cache_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,12 @@ var _ = Describe("ClusterCache Reconciler suite", func() {
Expect(k8sClient.Create(ctx, testNamespace)).To(Succeed())

By("Starting the ClusterCacheReconciler")
r, err := NewClusterCacheReconciler(
&log.NullLogger{},
mgr,
controller.Options{},
cct,
)
Expect(err).ToNot(HaveOccurred())
r := &ClusterCacheReconciler{
Log: &log.NullLogger{},
Client: mgr.GetClient(),
Tracker: cct,
}
Expect(r.SetupWithManager(mgr, controller.Options{})).To(Succeed())

By("Creating clusters to test with")
clusterRequest1, clusterCache1 = createAndWatchCluster("cluster-1")
Expand Down
16 changes: 16 additions & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"

"sigs.k8s.io/cluster-api/cmd/clusterctl/log"
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/test/helpers"
// +kubebuilder:scaffold:imports
)
Expand Down Expand Up @@ -66,6 +67,20 @@ var _ = BeforeSuite(func(done Done) {
testEnv, err = helpers.NewTestEnvironment()
Expect(err).NotTo(HaveOccurred())

// Set up a ClusterCacheTracker and ClusterCacheReconciler to provide to controllers
// requiring a connection to a remote cluster
tracker, err := remote.NewClusterCacheTracker(
log.Log,
testEnv.Manager,
)
Expect(err).ToNot(HaveOccurred())

Expect((&remote.ClusterCacheReconciler{
Client: testEnv,
Log: log.Log,
Tracker: tracker,
}).SetupWithManager(testEnv.Manager, controller.Options{MaxConcurrentReconciles: 1})).To(Succeed())

clusterReconciler = &ClusterReconciler{
Client: testEnv,
Log: log.Log,
Expand All @@ -90,6 +105,7 @@ var _ = BeforeSuite(func(done Done) {
Expect((&MachineHealthCheckReconciler{
Client: testEnv,
Log: log.Log,
Tracker: tracker,
recorder: testEnv.GetEventRecorderFor("machinehealthcheck-controller"),
}).SetupWithManager(testEnv.Manager, controller.Options{MaxConcurrentReconciles: 1})).To(Succeed())

Expand Down
25 changes: 23 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
clusterv1alpha3 "sigs.k8s.io/cluster-api/api/v1alpha3"
"sigs.k8s.io/cluster-api/cmd/version"
"sigs.k8s.io/cluster-api/controllers"
"sigs.k8s.io/cluster-api/controllers/remote"
expv1alpha3 "sigs.k8s.io/cluster-api/exp/api/v1alpha3"
expcontrollers "sigs.k8s.io/cluster-api/exp/controllers"
"sigs.k8s.io/cluster-api/feature"
Expand Down Expand Up @@ -194,6 +195,25 @@ func setupReconcilers(mgr ctrl.Manager) {
return
}

// Set up a ClusterCacheTracker and ClusterCacheReconciler to provide to controllers
// requiring a connection to a remote cluster
tracker, err := remote.NewClusterCacheTracker(
ctrl.Log.WithName("remote").WithName("ClusterCacheTracker"),
mgr,
)
if err != nil {
setupLog.Error(err, "unable to create cluster cache tracker")
os.Exit(1)
}
if err := (&remote.ClusterCacheReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("remote").WithName("ClusterCacheReconciler"),
Tracker: tracker,
}).SetupWithManager(mgr, concurrency(clusterConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
os.Exit(1)
}

if err := (&controllers.ClusterReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Cluster"),
Expand Down Expand Up @@ -233,8 +253,9 @@ func setupReconcilers(mgr ctrl.Manager) {
}
}
if err := (&controllers.MachineHealthCheckReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("MachineHealthCheck"),
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("MachineHealthCheck"),
Tracker: tracker,
}).SetupWithManager(mgr, concurrency(machineHealthCheckConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MachineHealthCheck")
os.Exit(1)
Expand Down

0 comments on commit bfb2a3b

Please sign in to comment.