From 58f919c5e2de3f3b7a2026ab96a0e06c5b3526ff Mon Sep 17 00:00:00 2001 From: Joel Speed Date: Tue, 16 Jun 2020 17:33:56 +0100 Subject: [PATCH] Fix for watch info matching in cluster cache tracker The cluster cache tracker tries to deduplicate watch requests by matching the watchinput kind, watcher and eventhandler to the watches that it has already started. However, this comparison was failing for real world event handlers because Go was not matching the inputs. Switching to reflect.DeepEqual helps with the equality of the watcher and kind, but still fails for eventHandlers because they will embed functions and DeepEqual will always return false for non-nil functions. As a solution/workaround, we replace the eventHandler with a string representation of itself which will identify the struct type and should give pointers to the functions embedded within it. I'm hoping this should be sufficiently unique to identify a particular codepath calling Watch, when combined with the watcher. --- controllers/remote/cluster_cache.go | 41 +++++++++++++++---- .../remote/cluster_cache_tracker_test.go | 32 ++++++++++++--- 2 files changed, 59 insertions(+), 14 deletions(-) diff --git a/controllers/remote/cluster_cache.go b/controllers/remote/cluster_cache.go index 5d03c842df9e..933116f874d6 100644 --- a/controllers/remote/cluster_cache.go +++ b/controllers/remote/cluster_cache.go @@ -19,6 +19,7 @@ package remote import ( "context" "fmt" + "reflect" "sync" "time" @@ -27,6 +28,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/scheme" @@ -111,9 +113,23 @@ type Watcher interface { // watchInfo is used as a map key to uniquely identify a watch. Because predicates is a slice, it cannot be included. type watchInfo struct { - watcher Watcher - kind runtime.Object - eventHandler handler.EventHandler + watcher Watcher + gvk schema.GroupVersionKind + + // Comparing the eventHandler as an interface doesn't work because reflect.DeepEqual + // will assert functions are false if they are non-nil. + // Use a signature string representation instead as this can be compared. + // The signature function is expected to produce a unique output for each unique handler + // function that is passed to it. + // In combination with the watcher, this should be enough to identify unique watches. + eventHandlerSignature string +} + +// eventHandlerSignature generates a unique identifier for the given eventHandler by +// printing it to a string using "%#v". +// Eg "&handler.EnqueueRequestsFromMapFunc{ToRequests:(handler.ToRequestsFunc)(0x271afb0)}" +func eventHandlerSignature(h handler.EventHandler) string { + return fmt.Sprintf("%#v", h) } // watchExists returns true if watch has already been established. This does NOT hold any lock. @@ -123,8 +139,12 @@ func (m *ClusterCacheTracker) watchExists(cluster client.ObjectKey, watch watchI return false } - _, watchFound := watchesForCluster[watch] - return watchFound + for w := range watchesForCluster { + if reflect.DeepEqual(w, watch) { + return true + } + } + return false } // deleteWatchesForCluster removes the watches for cluster from the tracker. @@ -156,10 +176,15 @@ type WatchInput struct { // Watch watches a remote cluster for resource events. If the watch already exists based on cluster, watcher, // kind, and eventHandler, then this is a no-op. func (m *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error { + gvk, err := apiutil.GVKForObject(input.Kind, m.scheme) + if err != nil { + return err + } + wi := watchInfo{ - watcher: input.Watcher, - kind: input.Kind, - eventHandler: input.EventHandler, + watcher: input.Watcher, + gvk: gvk, + eventHandlerSignature: eventHandlerSignature(input.EventHandler), } // First, check if the watch already exists diff --git a/controllers/remote/cluster_cache_tracker_test.go b/controllers/remote/cluster_cache_tracker_test.go index 27e06675b557..7a62c7f0adcd 100644 --- a/controllers/remote/cluster_cache_tracker_test.go +++ b/controllers/remote/cluster_cache_tracker_test.go @@ -25,6 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/util/workqueue" clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" @@ -36,6 +37,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -46,6 +48,7 @@ var _ = Describe("ClusterCache Tracker suite", func() { clusterKey client.ObjectKey watcher Watcher kind runtime.Object + gvkForKind schema.GroupVersionKind eventHandler handler.EventHandler predicates []predicate.Predicate watcherInfo chan testWatcherInfo @@ -78,9 +81,9 @@ var _ = Describe("ClusterCache Tracker suite", func() { It("should add a watchInfo for the watch", func() { expectedInfo := watchInfo{ - watcher: i.watcher, - kind: i.kind, - eventHandler: i.eventHandler, + watcher: i.watcher, + gvk: i.gvkForKind, + eventHandlerSignature: eventHandlerSignature(i.eventHandler), } Expect(func() map[watchInfo]struct{} { i.tracker.watchesLock.RLock() @@ -118,6 +121,10 @@ var _ = Describe("ClusterCache Tracker suite", func() { var testNamespace *corev1.Namespace var input assertWatchInput + mapper := func(_ handler.MapObject) []reconcile.Request { + return []reconcile.Request{} + } + BeforeEach(func() { By("Setting up a new manager") var err error @@ -161,13 +168,15 @@ var _ = Describe("ClusterCache Tracker suite", func() { testClusterKey := util.ObjectKey(testCluster) watcher, watcherInfo := newTestWatcher() kind := &corev1.Node{} - eventHandler := &handler.Funcs{} + gvkForNode := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Node"} + eventHandler := &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(mapper)} input = assertWatchInput{ cct, testClusterKey, watcher, kind, + gvkForNode, eventHandler, []predicate.Predicate{ &predicate.ResourceVersionChangedPredicate{}, @@ -203,11 +212,19 @@ var _ = Describe("ClusterCache Tracker suite", func() { Context("when Watch is called for a second time with the same input", func() { BeforeEach(func() { By("Calling watch on the test cluster") + + kind := &corev1.Node{} + eventHandler := &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(mapper)} + + // Check the second copies match + Expect(kind).To(Equal(input.kind)) + Expect(fmt.Sprintf("%#v", eventHandler)).To(Equal(fmt.Sprintf("%#v", input.eventHandler))) + Expect(cct.Watch(ctx, WatchInput{ Cluster: input.clusterKey, Watcher: input.watcher, - Kind: input.kind, - EventHandler: input.eventHandler, + Kind: kind, + EventHandler: eventHandler, Predicates: input.predicates, })).To(Succeed()) }) @@ -218,7 +235,9 @@ var _ = Describe("ClusterCache Tracker suite", func() { Context("when watch is called with a different Kind", func() { BeforeEach(func() { configMapKind := &corev1.ConfigMap{} + configMapGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} input.kind = configMapKind + input.gvkForKind = configMapGVK input.watchCount = 2 By("Calling watch on the test cluster") @@ -284,6 +303,7 @@ var _ = Describe("ClusterCache Tracker suite", func() { input.clusterKey, input.watcher, input.kind, + input.gvkForKind, input.eventHandler, []predicate.Predicate{ &predicate.ResourceVersionChangedPredicate{},