diff --git a/controllers/clustercache/cluster_accessor.go b/controllers/clustercache/cluster_accessor.go index 1e304d7c28be..ff24b0d6e1e2 100644 --- a/controllers/clustercache/cluster_accessor.go +++ b/controllers/clustercache/cluster_accessor.go @@ -32,7 +32,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/source" "sigs.k8s.io/cluster-api/util/certs" ) @@ -407,16 +406,16 @@ func (ca *clusterAccessor) GetClientCertificatePrivateKey(ctx context.Context) * } // Watch watches a workload cluster for events. -// Each unique watch (by input.Name) is only added once after a Connect (otherwise we return early). +// Each unique watch (by watcher.Name()) is only added once after a Connect (otherwise we return early). // During a disconnect existing watches (i.e. informers) are shutdown when stopping the cache. // After a re-connect watches will be re-added (assuming the Watch method is called again). -func (ca *clusterAccessor) Watch(ctx context.Context, input WatchInput) error { - if input.Name == "" { - return errors.New("input.Name is required") +func (ca *clusterAccessor) Watch(ctx context.Context, watcher Watcher) error { + if watcher.Name() == "" { + return errors.New("watcher.Name() cannot be empty") } if !ca.Connected(ctx) { - return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind) + return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", watcher.Name(), watcher.Object()) } log := ctrl.LoggerFrom(ctx) @@ -429,21 +428,21 @@ func (ca *clusterAccessor) Watch(ctx context.Context, input WatchInput) error { // Checking connection again while holding the lock, because maybe Disconnect was called since checking above. if ca.lockedState.connection == nil { - return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind) + return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", watcher.Name(), watcher.Object()) } // Return early if the watch was already added. - if ca.lockedState.connection.watches.Has(input.Name) { - log.V(6).Info(fmt.Sprintf("Skip creation of watch %s for %T because it already exists", input.Name, input.Kind)) + if ca.lockedState.connection.watches.Has(watcher.Name()) { + log.V(6).Info(fmt.Sprintf("Skip creation of watch %s for %T because it already exists", watcher.Name(), watcher.Object())) return nil } - log.Info(fmt.Sprintf("Creating watch %s for %T", input.Name, input.Kind)) - if err := input.Watcher.Watch(source.Kind(ca.lockedState.connection.cache, input.Kind, input.EventHandler, input.Predicates...)); err != nil { - return errors.Wrapf(err, "error creating watch %s for %T", input.Name, input.Kind) + log.Info(fmt.Sprintf("Creating watch %s for %T", watcher.Name(), watcher.Object())) + if err := watcher.Watch(ca.lockedState.connection.cache); err != nil { + return errors.Wrapf(err, "error creating watch %s for %T", watcher.Name(), watcher.Object()) } - ca.lockedState.connection.watches.Insert(input.Name) + ca.lockedState.connection.watches.Insert(watcher.Name()) return nil } diff --git a/controllers/clustercache/cluster_accessor_test.go b/controllers/clustercache/cluster_accessor_test.go index 0a8cef6d9754..4e48df37cd0d 100644 --- a/controllers/clustercache/cluster_accessor_test.go +++ b/controllers/clustercache/cluster_accessor_test.go @@ -327,7 +327,7 @@ func TestWatch(t *testing.T) { accessor := newClusterAccessor(clusterKey, config) tw := &testWatcher{} - wi := WatchInput{ + wi := WatcherOptions{ Name: "test-watch", Watcher: tw, Kind: &corev1.Node{}, @@ -335,7 +335,7 @@ func TestWatch(t *testing.T) { } // Add watch when not connected (fails) - err := accessor.Watch(ctx, wi) + err := accessor.Watch(ctx, NewWatcher(wi)) g.Expect(err).To(HaveOccurred()) g.Expect(errors.Is(err, ErrClusterNotConnected)).To(BeTrue()) @@ -346,12 +346,12 @@ func TestWatch(t *testing.T) { g.Expect(accessor.lockedState.connection.watches).To(BeEmpty()) // Add watch - g.Expect(accessor.Watch(ctx, wi)).To(Succeed()) + g.Expect(accessor.Watch(ctx, NewWatcher(wi))).To(Succeed()) g.Expect(accessor.lockedState.connection.watches.Has("test-watch")).To(BeTrue()) g.Expect(accessor.lockedState.connection.watches.Len()).To(Equal(1)) // Add watch again (no-op as watch already exists) - g.Expect(accessor.Watch(ctx, wi)).To(Succeed()) + g.Expect(accessor.Watch(ctx, NewWatcher(wi))).To(Succeed()) g.Expect(accessor.lockedState.connection.watches.Has("test-watch")).To(BeTrue()) g.Expect(accessor.lockedState.connection.watches.Len()).To(Equal(1)) diff --git a/controllers/clustercache/cluster_cache.go b/controllers/clustercache/cluster_cache.go index ddb59532ee67..f907e252569d 100644 --- a/controllers/clustercache/cluster_cache.go +++ b/controllers/clustercache/cluster_cache.go @@ -149,7 +149,7 @@ type ClusterCache interface { // During a disconnect existing watches (i.e. informers) are shutdown when stopping the cache. // After a re-connect watches will be re-added (assuming the Watch method is called again). // If there is no connection to the workload cluster ErrClusterNotConnected will be returned. - Watch(ctx context.Context, cluster client.ObjectKey, input WatchInput) error + Watch(ctx context.Context, cluster client.ObjectKey, watcher Watcher) error // GetLastProbeSuccessTimestamp returns the time when the health probe was successfully executed last. GetLastProbeSuccessTimestamp(ctx context.Context, cluster client.ObjectKey) time.Time @@ -169,17 +169,29 @@ type ClusterCache interface { // because there is no connection to the workload cluster. var ErrClusterNotConnected = errors.New("connection to the workload cluster is down") -// Watcher is a scoped-down interface from Controller that only has the Watch func. +// Watcher is an interface that can start a Watch. type Watcher interface { - // Watch watches the provided Source. - Watch(src source.Source) error + Name() string + Object() client.Object + Watch(cache cache.Cache) error } -// WatchInput specifies the parameters used to establish a new watch for a workload cluster. -// A source.Kind source (configured with Kind, EventHandler and Predicates) will be added to the Watcher. -// To watch for events, the source.Kind will create an informer on the Cache that we have created and cached +// SourceWatcher is a scoped-down interface from Controller that only has the Watch func. +type SourceWatcher[request comparable] interface { + Watch(src source.TypedSource[request]) error +} + +// WatcherOptions specifies the parameters used to establish a new watch for a workload cluster. +// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the Watcher. +// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached +// for the given Cluster. +type WatcherOptions = TypedWatcherOptions[client.Object, ctrl.Request] + +// TypedWatcherOptions specifies the parameters used to establish a new watch for a workload cluster. +// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the Watcher. +// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached // for the given Cluster. -type WatchInput struct { +type TypedWatcherOptions[object client.Object, request comparable] struct { // Name represents a unique Watch request for the specified Cluster. // The name is used to track that a specific watch is only added once to a cache. // After a connection (and thus also the cache) has been re-created, watches have to be added @@ -187,16 +199,44 @@ type WatchInput struct { Name string // Watcher is the watcher (controller) whose Reconcile() function will be called for events. - Watcher Watcher + Watcher SourceWatcher[request] // Kind is the type of resource to watch. - Kind client.Object + Kind object // EventHandler contains the event handlers to invoke for resource events. - EventHandler handler.EventHandler + EventHandler handler.TypedEventHandler[object, request] // Predicates is used to filter resource events. - Predicates []predicate.Predicate + Predicates []predicate.TypedPredicate[object] +} + +// NewWatcher creates a Watcher for the workload cluster. +// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the SourceWatcher. +// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached +// for the given Cluster. +func NewWatcher[object client.Object, request comparable](options TypedWatcherOptions[object, request]) Watcher { + return &watcher[object, request]{ + name: options.Name, + kind: options.Kind, + eventHandler: options.EventHandler, + predicates: options.Predicates, + watcher: options.Watcher, + } +} + +type watcher[object client.Object, request comparable] struct { + name string + kind object + eventHandler handler.TypedEventHandler[object, request] + predicates []predicate.TypedPredicate[object] + watcher SourceWatcher[request] +} + +func (tw *watcher[object, request]) Name() string { return tw.name } +func (tw *watcher[object, request]) Object() client.Object { return tw.kind } +func (tw *watcher[object, request]) Watch(cache cache.Cache) error { + return tw.watcher.Watch(source.TypedKind[object, request](cache, tw.kind, tw.eventHandler, tw.predicates...)) } // GetClusterSourceOption is an option that modifies GetClusterSourceOptions for a GetClusterSource call. @@ -342,12 +382,12 @@ func (cc *clusterCache) GetClientCertificatePrivateKey(ctx context.Context, clus return accessor.GetClientCertificatePrivateKey(ctx), nil } -func (cc *clusterCache) Watch(ctx context.Context, cluster client.ObjectKey, input WatchInput) error { +func (cc *clusterCache) Watch(ctx context.Context, cluster client.ObjectKey, watcher Watcher) error { accessor := cc.getClusterAccessor(cluster) if accessor == nil { - return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind) + return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", watcher.Name(), watcher.Object()) } - return accessor.Watch(ctx, input) + return accessor.Watch(ctx, watcher) } func (cc *clusterCache) GetLastProbeSuccessTimestamp(ctx context.Context, cluster client.ObjectKey) time.Time { diff --git a/exp/internal/controllers/machinepool_controller.go b/exp/internal/controllers/machinepool_controller.go index ee956dc0d7f2..74e7c603b334 100644 --- a/exp/internal/controllers/machinepool_controller.go +++ b/exp/internal/controllers/machinepool_controller.go @@ -371,12 +371,12 @@ func (r *MachinePoolReconciler) watchClusterNodes(ctx context.Context, cluster * return nil } - return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{ + return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatcherOptions{ Name: "machinepool-watchNodes", Watcher: r.controller, Kind: &corev1.Node{}, EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachinePool), - }) + })) } func (r *MachinePoolReconciler) nodeToMachinePool(ctx context.Context, o client.Object) []reconcile.Request { diff --git a/internal/controllers/machine/machine_controller.go b/internal/controllers/machine/machine_controller.go index afe1371480f1..af74152c4c3a 100644 --- a/internal/controllers/machine/machine_controller.go +++ b/internal/controllers/machine/machine_controller.go @@ -1051,12 +1051,12 @@ func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.C return nil } - return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{ + return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatcherOptions{ Name: "machine-watchNodes", Watcher: r.controller, Kind: &corev1.Node{}, EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachine), - }) + })) } func (r *Reconciler) nodeToMachine(ctx context.Context, o client.Object) []reconcile.Request { diff --git a/internal/controllers/machine/machine_controller_noderef_test.go b/internal/controllers/machine/machine_controller_noderef_test.go index c568cf35d03c..ca4f4ce7b92c 100644 --- a/internal/controllers/machine/machine_controller_noderef_test.go +++ b/internal/controllers/machine/machine_controller_noderef_test.go @@ -351,14 +351,14 @@ func TestGetNode(t *testing.T) { // Retry because the ClusterCache might not have immediately created the clusterAccessor. g.Eventually(func(g Gomega) { - g.Expect(clusterCache.Watch(ctx, util.ObjectKey(testCluster), clustercache.WatchInput{ + g.Expect(clusterCache.Watch(ctx, util.ObjectKey(testCluster), clustercache.NewWatcher(clustercache.WatcherOptions{ Name: "TestGetNode", Watcher: w, Kind: &corev1.Node{}, EventHandler: handler.EnqueueRequestsFromMapFunc(func(context.Context, client.Object) []reconcile.Request { return nil }), - })).To(Succeed()) + }))).To(Succeed()) }, 1*time.Minute, 5*time.Second).Should(Succeed()) for _, tc := range testCases { diff --git a/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go b/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go index 6e2c135037c5..968ad03484aa 100644 --- a/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go +++ b/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go @@ -612,12 +612,12 @@ func (r *Reconciler) nodeToMachineHealthCheck(ctx context.Context, o client.Obje } func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.Cluster) error { - return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{ + return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatcherOptions{ Name: "machinehealthcheck-watchClusterNodes", Watcher: r.controller, Kind: &corev1.Node{}, EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachineHealthCheck), - }) + })) } // getMachineFromNode retrieves the machine with a nodeRef to nodeName