diff --git a/controllers/remote/cluster_cache.go b/controllers/remote/cluster_cache.go index dd4cd70131df..7e82d4636421 100644 --- a/controllers/remote/cluster_cache.go +++ b/controllers/remote/cluster_cache.go @@ -344,8 +344,9 @@ func (m *ClusterCacheTracker) healthCheckCluster(in *healthCheckInput) { // healthCheckPath attempts to request a given absolute path from the API server // defined in the rest.Config and returns any errors that occurred during the request. -func healthCheckPath(cfg *rest.Config, requestTimeout time.Duration, path string) error { +func healthCheckPath(sourceCfg *rest.Config, requestTimeout time.Duration, path string) error { codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()} + cfg := rest.CopyConfig(sourceCfg) cfg.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec}) restClient, err := rest.UnversionedRESTClientFor(cfg) diff --git a/controllers/remote/cluster_cache_healthcheck_test.go b/controllers/remote/cluster_cache_healthcheck_test.go index f0ac2ffb0e78..58934a5df1d5 100644 --- a/controllers/remote/cluster_cache_healthcheck_test.go +++ b/controllers/remote/cluster_cache_healthcheck_test.go @@ -114,9 +114,15 @@ var _ = Describe("ClusterCache HealthCheck suite", func() { // This should succed after 1 second, approx 10 requests to the API Consistently(func() *clusterCache { + cct.clusterCachesLock.RLock() + defer cct.clusterCachesLock.RUnlock() return cct.clusterCaches[testClusterKey] }).ShouldNot(BeNil()) - Expect(cc.stopped).To(BeFalse()) + Expect(func() bool { + cc.lock.Lock() + defer cc.lock.Unlock() + return cc.stopped + }()).To(BeFalse()) }) It("with an invalid path", func() { @@ -127,9 +133,15 @@ var _ = Describe("ClusterCache HealthCheck suite", func() { // This should succeed after 3 consecutive failed requests Eventually(func() *clusterCache { + cct.clusterCachesLock.RLock() + defer cct.clusterCachesLock.RUnlock() return cct.clusterCaches[testClusterKey] }).Should(BeNil()) - Expect(cc.stopped).To(BeTrue()) + Expect(func() bool { + cc.lock.Lock() + defer cc.lock.Unlock() + return cc.stopped + }()).To(BeTrue()) }) It("with an invalid config", func() { @@ -149,9 +161,15 @@ var _ = Describe("ClusterCache HealthCheck suite", func() { // This should succeed after 3 consecutive failed requests Eventually(func() *clusterCache { + cct.clusterCachesLock.RLock() + defer cct.clusterCachesLock.RUnlock() return cct.clusterCaches[testClusterKey] }).Should(BeNil()) - Expect(cc.stopped).To(BeTrue()) + Expect(func() bool { + cc.lock.Lock() + defer cc.lock.Unlock() + return cc.stopped + }()).To(BeTrue()) }) }) }) diff --git a/controllers/remote/cluster_cache_reconciler_test.go b/controllers/remote/cluster_cache_reconciler_test.go index 6728dca9fd75..4f5758cef1c5 100644 --- a/controllers/remote/cluster_cache_reconciler_test.go +++ b/controllers/remote/cluster_cache_reconciler_test.go @@ -89,7 +89,11 @@ var _ = Describe("ClusterCache Reconciler suite", func() { })).To(Succeed()) // Check that a cache was created for the cluster - cc := cct.clusterCaches[testClusterKey] + cc := func() *clusterCache { + cct.clusterCachesLock.RLock() + defer cct.clusterCachesLock.RUnlock() + return cct.clusterCaches[testClusterKey] + }() Expect(cc).ToNot(BeNil()) return reconcile.Request{NamespacedName: testClusterKey}, cc } @@ -178,6 +182,8 @@ var _ = Describe("ClusterCache Reconciler suite", func() { By(fmt.Sprintf("Checking cluster %q's cache is stopped", cluster.key.Name)) cc := cluster.cache Eventually(func() bool { + cc.lock.Lock() + defer cc.lock.Unlock() return cc.stopped }, timeout).Should(BeTrue()) } @@ -187,6 +193,8 @@ var _ = Describe("ClusterCache Reconciler suite", func() { By(fmt.Sprintf("Checking cluster %q's cache is running", cluster.key.Name)) cc := cluster.cache Consistently(func() bool { + cc.lock.Lock() + defer cc.lock.Unlock() return cc.stopped }).Should(BeFalse()) } @@ -201,7 +209,11 @@ var _ = Describe("ClusterCache Reconciler suite", func() { } matchers = append(matchers, HaveLen(len(in.runningClusters()))) - Eventually(cct.clusterCaches, timeout).Should(SatisfyAll(matchers...)) + Eventually(func() map[client.ObjectKey]*clusterCache { + cct.clusterCachesLock.RLock() + defer cct.clusterCachesLock.RUnlock() + return cct.clusterCaches + }, timeout).Should(SatisfyAll(matchers...)) }) By("Checking deleted Cluster's Watches are removed", func() { @@ -214,7 +226,11 @@ var _ = Describe("ClusterCache Reconciler suite", func() { } matchers = append(matchers, HaveLen(len(in.runningClusters()))) - Eventually(cct.watches, timeout).Should(SatisfyAll(matchers...)) + Eventually(func() map[client.ObjectKey]map[watchInfo]struct{} { + cct.watchesLock.RLock() + defer cct.watchesLock.RUnlock() + return cct.watches + }(), timeout).Should(SatisfyAll(matchers...)) }) }, Entry("when no clusters deleted", &clusterDeletedInput{ diff --git a/controllers/remote/cluster_cache_tracker_test.go b/controllers/remote/cluster_cache_tracker_test.go index 45bfacac4ddb..3b0b450feedf 100644 --- a/controllers/remote/cluster_cache_tracker_test.go +++ b/controllers/remote/cluster_cache_tracker_test.go @@ -55,11 +55,24 @@ var _ = Describe("ClusterCache Tracker suite", func() { var assertWatch = func(i *assertWatchInput) { It("should create a running cache for the cluster", func() { - cache, ok := i.tracker.clusterCaches[i.clusterKey] + cache, ok := func() (*clusterCache, bool) { + i.tracker.clusterCachesLock.RLock() + defer i.tracker.clusterCachesLock.RUnlock() + cc, ok := i.tracker.clusterCaches[i.clusterKey] + return cc, ok + }() Expect(ok).To(BeTrue()) stop := make(chan struct{}) - Expect(cache.stopped).To(BeFalse()) - Expect(cache.stop).ToNot(BeNil()) + Expect(func() bool { + cache.lock.Lock() + defer cache.lock.Unlock() + return cache.stopped + }()).To(BeFalse()) + Expect(func() chan struct{} { + cache.lock.Lock() + defer cache.lock.Unlock() + return cache.stop + }()).ToNot(BeNil()) // WaitForCacheSync returns false if Start was not called Expect(cache.Cache.WaitForCacheSync(stop)).To(BeTrue()) }) @@ -70,7 +83,11 @@ var _ = Describe("ClusterCache Tracker suite", func() { kind: i.kind, eventHandler: i.eventHandler, } - Expect(i.tracker.watches[i.clusterKey]).To(HaveKey(Equal(expectedInfo))) + Expect(func() map[watchInfo]struct{} { + i.tracker.watchesLock.RLock() + defer i.tracker.watchesLock.RUnlock() + return i.tracker.watches[i.clusterKey] + }()).To(HaveKey(Equal(expectedInfo))) }) It("should call the Watcher with the correct values", func() { @@ -83,7 +100,6 @@ var _ = Describe("ClusterCache Tracker suite", func() { Expect(infos).To(ContainElement( Equal(testWatcherInfo{ - source: source.NewKindWithCache(i.kind, i.tracker.clusterCaches[i.clusterKey]), handler: i.eventHandler, predicates: i.predicates, }), @@ -318,14 +334,25 @@ var _ = Describe("ClusterCache Tracker suite", func() { assertWatch(&differentClusterInput) It("should have independent caches for each cluster", func() { - testClusterCache, ok := cct.clusterCaches[input.clusterKey] + testClusterCache, ok := func() (*clusterCache, bool) { + cct.clusterCachesLock.RLock() + defer cct.clusterCachesLock.RUnlock() + cc, ok := cct.clusterCaches[input.clusterKey] + return cc, ok + }() Expect(ok).To(BeTrue()) - differentClusterCache, ok := cct.clusterCaches[differentClusterInput.clusterKey] + differentClusterCache, ok := func() (*clusterCache, bool) { + cct.clusterCachesLock.RLock() + defer cct.clusterCachesLock.RUnlock() + cc, ok := cct.clusterCaches[differentClusterInput.clusterKey] + return cc, ok + }() Expect(ok).To(BeTrue()) // Check stop channels are different, so that they can be stopped independently Expect(testClusterCache.stop).ToNot(Equal(differentClusterCache.stop)) // Caches should also be different as they were constructed for different clusters - Expect(testClusterCache.Cache).ToNot(Equal(differentClusterCache.Cache)) + // Check the memory locations to assert independence + Expect(fmt.Sprintf("%p", testClusterCache.Cache)).ToNot(Equal(fmt.Sprintf("%p", differentClusterCache.Cache))) }) }) }) @@ -338,14 +365,12 @@ type testWatcher struct { } type testWatcherInfo struct { - source source.Source handler handler.EventHandler predicates []predicate.Predicate } func (t *testWatcher) Watch(s source.Source, h handler.EventHandler, ps ...predicate.Predicate) error { t.watchInfo <- testWatcherInfo{ - source: s, handler: h, predicates: ps, }