Skip to content

Commit

Permalink
clustercache: add typed watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
ykakarap committed Nov 11, 2024
1 parent 0202f9c commit 1c26e63
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 40 deletions.
25 changes: 12 additions & 13 deletions controllers/clustercache/cluster_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/source"

"sigs.k8s.io/cluster-api/util/certs"
)
Expand Down Expand Up @@ -407,16 +406,16 @@ func (ca *clusterAccessor) GetClientCertificatePrivateKey(ctx context.Context) *
}

// Watch watches a workload cluster for events.
// Each unique watch (by input.Name) is only added once after a Connect (otherwise we return early).
// Each unique watch (by watcher.Name()) is only added once after a Connect (otherwise we return early).
// During a disconnect existing watches (i.e. informers) are shutdown when stopping the cache.
// After a re-connect watches will be re-added (assuming the Watch method is called again).
func (ca *clusterAccessor) Watch(ctx context.Context, input WatchInput) error {
if input.Name == "" {
return errors.New("input.Name is required")
func (ca *clusterAccessor) Watch(ctx context.Context, watcher Watcher) error {
if watcher.Name() == "" {
return errors.New("watcher.Name() cannot be empty")
}

if !ca.Connected(ctx) {
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", watcher.Name(), watcher.Object())
}

log := ctrl.LoggerFrom(ctx)
Expand All @@ -429,21 +428,21 @@ func (ca *clusterAccessor) Watch(ctx context.Context, input WatchInput) error {

// Checking connection again while holding the lock, because maybe Disconnect was called since checking above.
if ca.lockedState.connection == nil {
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", watcher.Name(), watcher.Object())
}

// Return early if the watch was already added.
if ca.lockedState.connection.watches.Has(input.Name) {
log.V(6).Info(fmt.Sprintf("Skip creation of watch %s for %T because it already exists", input.Name, input.Kind))
if ca.lockedState.connection.watches.Has(watcher.Name()) {
log.V(6).Info(fmt.Sprintf("Skip creation of watch %s for %T because it already exists", watcher.Name(), watcher.Object()))
return nil
}

log.Info(fmt.Sprintf("Creating watch %s for %T", input.Name, input.Kind))
if err := input.Watcher.Watch(source.Kind(ca.lockedState.connection.cache, input.Kind, input.EventHandler, input.Predicates...)); err != nil {
return errors.Wrapf(err, "error creating watch %s for %T", input.Name, input.Kind)
log.Info(fmt.Sprintf("Creating watch %s for %T", watcher.Name(), watcher.Object()))
if err := watcher.Watch(ca.lockedState.connection.cache); err != nil {
return errors.Wrapf(err, "error creating watch %s for %T", watcher.Name(), watcher.Object())
}

ca.lockedState.connection.watches.Insert(input.Name)
ca.lockedState.connection.watches.Insert(watcher.Name())
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions controllers/clustercache/cluster_accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,15 @@ func TestWatch(t *testing.T) {
accessor := newClusterAccessor(clusterKey, config)

tw := &testWatcher{}
wi := WatchInput{
wi := WatcherOptions{
Name: "test-watch",
Watcher: tw,
Kind: &corev1.Node{},
EventHandler: &handler.EnqueueRequestForObject{},
}

// Add watch when not connected (fails)
err := accessor.Watch(ctx, wi)
err := accessor.Watch(ctx, NewWatcher(wi))
g.Expect(err).To(HaveOccurred())
g.Expect(errors.Is(err, ErrClusterNotConnected)).To(BeTrue())

Expand All @@ -346,12 +346,12 @@ func TestWatch(t *testing.T) {
g.Expect(accessor.lockedState.connection.watches).To(BeEmpty())

// Add watch
g.Expect(accessor.Watch(ctx, wi)).To(Succeed())
g.Expect(accessor.Watch(ctx, NewWatcher(wi))).To(Succeed())
g.Expect(accessor.lockedState.connection.watches.Has("test-watch")).To(BeTrue())
g.Expect(accessor.lockedState.connection.watches.Len()).To(Equal(1))

// Add watch again (no-op as watch already exists)
g.Expect(accessor.Watch(ctx, wi)).To(Succeed())
g.Expect(accessor.Watch(ctx, NewWatcher(wi))).To(Succeed())
g.Expect(accessor.lockedState.connection.watches.Has("test-watch")).To(BeTrue())
g.Expect(accessor.lockedState.connection.watches.Len()).To(Equal(1))

Expand Down
70 changes: 55 additions & 15 deletions controllers/clustercache/cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type ClusterCache interface {
// During a disconnect existing watches (i.e. informers) are shutdown when stopping the cache.
// After a re-connect watches will be re-added (assuming the Watch method is called again).
// If there is no connection to the workload cluster ErrClusterNotConnected will be returned.
Watch(ctx context.Context, cluster client.ObjectKey, input WatchInput) error
Watch(ctx context.Context, cluster client.ObjectKey, watcher Watcher) error

// GetLastProbeSuccessTimestamp returns the time when the health probe was successfully executed last.
GetLastProbeSuccessTimestamp(ctx context.Context, cluster client.ObjectKey) time.Time
Expand All @@ -169,34 +169,74 @@ type ClusterCache interface {
// because there is no connection to the workload cluster.
var ErrClusterNotConnected = errors.New("connection to the workload cluster is down")

// Watcher is a scoped-down interface from Controller that only has the Watch func.
// Watcher is an interface that can start a Watch.
type Watcher interface {
// Watch watches the provided Source.
Watch(src source.Source) error
Name() string
Object() client.Object
Watch(cache cache.Cache) error
}

// WatchInput specifies the parameters used to establish a new watch for a workload cluster.
// A source.Kind source (configured with Kind, EventHandler and Predicates) will be added to the Watcher.
// To watch for events, the source.Kind will create an informer on the Cache that we have created and cached
// SourceWatcher is a scoped-down interface from Controller that only has the Watch func.
type SourceWatcher[request comparable] interface {
Watch(src source.TypedSource[request]) error
}

// WatcherOptions specifies the parameters used to establish a new watch for a workload cluster.
// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the Watcher.
// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached
// for the given Cluster.
type WatcherOptions = TypedWatcherOptions[client.Object, ctrl.Request]

// TypedWatcherOptions specifies the parameters used to establish a new watch for a workload cluster.
// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the Watcher.
// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached
// for the given Cluster.
type WatchInput struct {
type TypedWatcherOptions[object client.Object, request comparable] struct {
// Name represents a unique Watch request for the specified Cluster.
// The name is used to track that a specific watch is only added once to a cache.
// After a connection (and thus also the cache) has been re-created, watches have to be added
// again by calling the Watch method again.
Name string

// Watcher is the watcher (controller) whose Reconcile() function will be called for events.
Watcher Watcher
Watcher SourceWatcher[request]

// Kind is the type of resource to watch.
Kind client.Object
Kind object

// EventHandler contains the event handlers to invoke for resource events.
EventHandler handler.EventHandler
EventHandler handler.TypedEventHandler[object, request]

// Predicates is used to filter resource events.
Predicates []predicate.Predicate
Predicates []predicate.TypedPredicate[object]
}

// NewWatcher creates a Watcher for the workload cluster.
// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the SourceWatcher.
// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached
// for the given Cluster.
func NewWatcher[object client.Object, request comparable](options TypedWatcherOptions[object, request]) Watcher {
return &watcher[object, request]{
name: options.Name,
kind: options.Kind,
eventHandler: options.EventHandler,
predicates: options.Predicates,
watcher: options.Watcher,
}
}

type watcher[object client.Object, request comparable] struct {
name string
kind object
eventHandler handler.TypedEventHandler[object, request]
predicates []predicate.TypedPredicate[object]
watcher SourceWatcher[request]
}

func (tw *watcher[object, request]) Name() string { return tw.name }
func (tw *watcher[object, request]) Object() client.Object { return tw.kind }
func (tw *watcher[object, request]) Watch(cache cache.Cache) error {
return tw.watcher.Watch(source.TypedKind[object, request](cache, tw.kind, tw.eventHandler, tw.predicates...))
}

// GetClusterSourceOption is an option that modifies GetClusterSourceOptions for a GetClusterSource call.
Expand Down Expand Up @@ -342,12 +382,12 @@ func (cc *clusterCache) GetClientCertificatePrivateKey(ctx context.Context, clus
return accessor.GetClientCertificatePrivateKey(ctx), nil
}

func (cc *clusterCache) Watch(ctx context.Context, cluster client.ObjectKey, input WatchInput) error {
func (cc *clusterCache) Watch(ctx context.Context, cluster client.ObjectKey, watcher Watcher) error {
accessor := cc.getClusterAccessor(cluster)
if accessor == nil {
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", watcher.Name(), watcher.Object())
}
return accessor.Watch(ctx, input)
return accessor.Watch(ctx, watcher)
}

func (cc *clusterCache) GetLastProbeSuccessTimestamp(ctx context.Context, cluster client.ObjectKey) time.Time {
Expand Down
4 changes: 2 additions & 2 deletions exp/internal/controllers/machinepool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,12 @@ func (r *MachinePoolReconciler) watchClusterNodes(ctx context.Context, cluster *
return nil
}

return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatcherOptions{
Name: "machinepool-watchNodes",
Watcher: r.controller,
Kind: &corev1.Node{},
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachinePool),
})
}))
}

func (r *MachinePoolReconciler) nodeToMachinePool(ctx context.Context, o client.Object) []reconcile.Request {
Expand Down
4 changes: 2 additions & 2 deletions internal/controllers/machine/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,12 +1045,12 @@ func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.C
return nil
}

return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatcherOptions{
Name: "machine-watchNodes",
Watcher: r.controller,
Kind: &corev1.Node{},
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachine),
})
}))
}

func (r *Reconciler) nodeToMachine(ctx context.Context, o client.Object) []reconcile.Request {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,14 @@ func TestGetNode(t *testing.T) {

// Retry because the ClusterCache might not have immediately created the clusterAccessor.
g.Eventually(func(g Gomega) {
g.Expect(clusterCache.Watch(ctx, util.ObjectKey(testCluster), clustercache.WatchInput{
g.Expect(clusterCache.Watch(ctx, util.ObjectKey(testCluster), clustercache.NewWatcher(clustercache.WatcherOptions{
Name: "TestGetNode",
Watcher: w,
Kind: &corev1.Node{},
EventHandler: handler.EnqueueRequestsFromMapFunc(func(context.Context, client.Object) []reconcile.Request {
return nil
}),
})).To(Succeed())
}))).To(Succeed())
}, 1*time.Minute, 5*time.Second).Should(Succeed())

for _, tc := range testCases {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,12 @@ func (r *Reconciler) nodeToMachineHealthCheck(ctx context.Context, o client.Obje
}

func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.Cluster) error {
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatcherOptions{
Name: "machinehealthcheck-watchClusterNodes",
Watcher: r.controller,
Kind: &corev1.Node{},
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachineHealthCheck),
})
}))
}

// getMachineFromNode retrieves the machine with a nodeRef to nodeName
Expand Down

0 comments on commit 1c26e63

Please sign in to comment.