Skip to content

Commit

Permalink
Merge pull request #3201 from JoelSpeed/fix-watchinfo-equality
Browse files Browse the repository at this point in the history
 🐛  Fix for watch info matching in cluster cache tracker
  • Loading branch information
k8s-ci-robot authored Jun 19, 2020
2 parents 257525f + 58f919c commit f2d71a3
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 14 deletions.
41 changes: 33 additions & 8 deletions controllers/remote/cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package remote
import (
"context"
"fmt"
"reflect"
"sync"
"time"

Expand All @@ -27,6 +28,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -111,9 +113,23 @@ type Watcher interface {

// watchInfo is used as a map key to uniquely identify a watch. Because predicates is a slice, it cannot be included.
type watchInfo struct {
watcher Watcher
kind runtime.Object
eventHandler handler.EventHandler
watcher Watcher
gvk schema.GroupVersionKind

// Comparing the eventHandler as an interface doesn't work because reflect.DeepEqual
// will assert functions are false if they are non-nil.
// Use a signature string representation instead as this can be compared.
// The signature function is expected to produce a unique output for each unique handler
// function that is passed to it.
// In combination with the watcher, this should be enough to identify unique watches.
eventHandlerSignature string
}

// eventHandlerSignature generates a unique identifier for the given eventHandler by
// printing it to a string using "%#v".
// Eg "&handler.EnqueueRequestsFromMapFunc{ToRequests:(handler.ToRequestsFunc)(0x271afb0)}"
func eventHandlerSignature(h handler.EventHandler) string {
return fmt.Sprintf("%#v", h)
}

// watchExists returns true if watch has already been established. This does NOT hold any lock.
Expand All @@ -123,8 +139,12 @@ func (m *ClusterCacheTracker) watchExists(cluster client.ObjectKey, watch watchI
return false
}

_, watchFound := watchesForCluster[watch]
return watchFound
for w := range watchesForCluster {
if reflect.DeepEqual(w, watch) {
return true
}
}
return false
}

// deleteWatchesForCluster removes the watches for cluster from the tracker.
Expand Down Expand Up @@ -156,10 +176,15 @@ type WatchInput struct {
// Watch watches a remote cluster for resource events. If the watch already exists based on cluster, watcher,
// kind, and eventHandler, then this is a no-op.
func (m *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error {
gvk, err := apiutil.GVKForObject(input.Kind, m.scheme)
if err != nil {
return err
}

wi := watchInfo{
watcher: input.Watcher,
kind: input.Kind,
eventHandler: input.EventHandler,
watcher: input.Watcher,
gvk: gvk,
eventHandlerSignature: eventHandlerSignature(input.EventHandler),
}

// First, check if the watch already exists
Expand Down
32 changes: 26 additions & 6 deletions controllers/remote/cluster_cache_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/util/workqueue"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
Expand All @@ -36,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

Expand All @@ -46,6 +48,7 @@ var _ = Describe("ClusterCache Tracker suite", func() {
clusterKey client.ObjectKey
watcher Watcher
kind runtime.Object
gvkForKind schema.GroupVersionKind
eventHandler handler.EventHandler
predicates []predicate.Predicate
watcherInfo chan testWatcherInfo
Expand Down Expand Up @@ -78,9 +81,9 @@ var _ = Describe("ClusterCache Tracker suite", func() {

It("should add a watchInfo for the watch", func() {
expectedInfo := watchInfo{
watcher: i.watcher,
kind: i.kind,
eventHandler: i.eventHandler,
watcher: i.watcher,
gvk: i.gvkForKind,
eventHandlerSignature: eventHandlerSignature(i.eventHandler),
}
Expect(func() map[watchInfo]struct{} {
i.tracker.watchesLock.RLock()
Expand Down Expand Up @@ -118,6 +121,10 @@ var _ = Describe("ClusterCache Tracker suite", func() {
var testNamespace *corev1.Namespace
var input assertWatchInput

mapper := func(_ handler.MapObject) []reconcile.Request {
return []reconcile.Request{}
}

BeforeEach(func() {
By("Setting up a new manager")
var err error
Expand Down Expand Up @@ -161,13 +168,15 @@ var _ = Describe("ClusterCache Tracker suite", func() {
testClusterKey := util.ObjectKey(testCluster)
watcher, watcherInfo := newTestWatcher()
kind := &corev1.Node{}
eventHandler := &handler.Funcs{}
gvkForNode := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Node"}
eventHandler := &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(mapper)}

input = assertWatchInput{
cct,
testClusterKey,
watcher,
kind,
gvkForNode,
eventHandler,
[]predicate.Predicate{
&predicate.ResourceVersionChangedPredicate{},
Expand Down Expand Up @@ -203,11 +212,19 @@ var _ = Describe("ClusterCache Tracker suite", func() {
Context("when Watch is called for a second time with the same input", func() {
BeforeEach(func() {
By("Calling watch on the test cluster")

kind := &corev1.Node{}
eventHandler := &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(mapper)}

// Check the second copies match
Expect(kind).To(Equal(input.kind))
Expect(fmt.Sprintf("%#v", eventHandler)).To(Equal(fmt.Sprintf("%#v", input.eventHandler)))

Expect(cct.Watch(ctx, WatchInput{
Cluster: input.clusterKey,
Watcher: input.watcher,
Kind: input.kind,
EventHandler: input.eventHandler,
Kind: kind,
EventHandler: eventHandler,
Predicates: input.predicates,
})).To(Succeed())
})
Expand All @@ -218,7 +235,9 @@ var _ = Describe("ClusterCache Tracker suite", func() {
Context("when watch is called with a different Kind", func() {
BeforeEach(func() {
configMapKind := &corev1.ConfigMap{}
configMapGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"}
input.kind = configMapKind
input.gvkForKind = configMapGVK
input.watchCount = 2

By("Calling watch on the test cluster")
Expand Down Expand Up @@ -284,6 +303,7 @@ var _ = Describe("ClusterCache Tracker suite", func() {
input.clusterKey,
input.watcher,
input.kind,
input.gvkForKind,
input.eventHandler,
[]predicate.Predicate{
&predicate.ResourceVersionChangedPredicate{},
Expand Down

0 comments on commit f2d71a3

Please sign in to comment.