Skip to content

Commit

Permalink
Fix races in ClusterCache tests
Browse files Browse the repository at this point in the history
  • Loading branch information
JoelSpeed committed May 4, 2020
1 parent 7c2ee69 commit 859329b
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 17 deletions.
3 changes: 2 additions & 1 deletion controllers/remote/cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,9 @@ func (m *ClusterCacheTracker) healthCheckCluster(in *healthCheckInput) {

// healthCheckPath attempts to request a given absolute path from the API server
// defined in the rest.Config and returns any errors that occurred during the request.
func healthCheckPath(cfg *rest.Config, requestTimeout time.Duration, path string) error {
func healthCheckPath(sourceCfg *rest.Config, requestTimeout time.Duration, path string) error {
codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()}
cfg := rest.CopyConfig(sourceCfg)
cfg.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec})

restClient, err := rest.UnversionedRESTClientFor(cfg)
Expand Down
24 changes: 21 additions & 3 deletions controllers/remote/cluster_cache_healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,15 @@ var _ = Describe("ClusterCache HealthCheck suite", func() {

// This should succed after 1 second, approx 10 requests to the API
Consistently(func() *clusterCache {
cct.clusterCachesLock.RLock()
defer cct.clusterCachesLock.RUnlock()
return cct.clusterCaches[testClusterKey]
}).ShouldNot(BeNil())
Expect(cc.stopped).To(BeFalse())
Expect(func() bool {
cc.lock.Lock()
defer cc.lock.Unlock()
return cc.stopped
}()).To(BeFalse())
})

It("with an invalid path", func() {
Expand All @@ -127,9 +133,15 @@ var _ = Describe("ClusterCache HealthCheck suite", func() {

// This should succeed after 3 consecutive failed requests
Eventually(func() *clusterCache {
cct.clusterCachesLock.RLock()
defer cct.clusterCachesLock.RUnlock()
return cct.clusterCaches[testClusterKey]
}).Should(BeNil())
Expect(cc.stopped).To(BeTrue())
Expect(func() bool {
cc.lock.Lock()
defer cc.lock.Unlock()
return cc.stopped
}()).To(BeTrue())
})

It("with an invalid config", func() {
Expand All @@ -149,9 +161,15 @@ var _ = Describe("ClusterCache HealthCheck suite", func() {

// This should succeed after 3 consecutive failed requests
Eventually(func() *clusterCache {
cct.clusterCachesLock.RLock()
defer cct.clusterCachesLock.RUnlock()
return cct.clusterCaches[testClusterKey]
}).Should(BeNil())
Expect(cc.stopped).To(BeTrue())
Expect(func() bool {
cc.lock.Lock()
defer cc.lock.Unlock()
return cc.stopped
}()).To(BeTrue())
})
})
})
22 changes: 19 additions & 3 deletions controllers/remote/cluster_cache_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ var _ = Describe("ClusterCache Reconciler suite", func() {
})).To(Succeed())

// Check that a cache was created for the cluster
cc := cct.clusterCaches[testClusterKey]
cc := func() *clusterCache {
cct.clusterCachesLock.RLock()
defer cct.clusterCachesLock.RUnlock()
return cct.clusterCaches[testClusterKey]
}()
Expect(cc).ToNot(BeNil())
return reconcile.Request{NamespacedName: testClusterKey}, cc
}
Expand Down Expand Up @@ -178,6 +182,8 @@ var _ = Describe("ClusterCache Reconciler suite", func() {
By(fmt.Sprintf("Checking cluster %q's cache is stopped", cluster.key.Name))
cc := cluster.cache
Eventually(func() bool {
cc.lock.Lock()
defer cc.lock.Unlock()
return cc.stopped
}, timeout).Should(BeTrue())
}
Expand All @@ -187,6 +193,8 @@ var _ = Describe("ClusterCache Reconciler suite", func() {
By(fmt.Sprintf("Checking cluster %q's cache is running", cluster.key.Name))
cc := cluster.cache
Consistently(func() bool {
cc.lock.Lock()
defer cc.lock.Unlock()
return cc.stopped
}).Should(BeFalse())
}
Expand All @@ -201,7 +209,11 @@ var _ = Describe("ClusterCache Reconciler suite", func() {
}
matchers = append(matchers, HaveLen(len(in.runningClusters())))

Eventually(cct.clusterCaches, timeout).Should(SatisfyAll(matchers...))
Eventually(func() map[client.ObjectKey]*clusterCache {
cct.clusterCachesLock.RLock()
defer cct.clusterCachesLock.RUnlock()
return cct.clusterCaches
}, timeout).Should(SatisfyAll(matchers...))
})

By("Checking deleted Cluster's Watches are removed", func() {
Expand All @@ -214,7 +226,11 @@ var _ = Describe("ClusterCache Reconciler suite", func() {
}
matchers = append(matchers, HaveLen(len(in.runningClusters())))

Eventually(cct.watches, timeout).Should(SatisfyAll(matchers...))
Eventually(func() map[client.ObjectKey]map[watchInfo]struct{} {
cct.watchesLock.RLock()
defer cct.watchesLock.RUnlock()
return cct.watches
}(), timeout).Should(SatisfyAll(matchers...))
})
},
Entry("when no clusters deleted", &clusterDeletedInput{
Expand Down
45 changes: 35 additions & 10 deletions controllers/remote/cluster_cache_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,24 @@ var _ = Describe("ClusterCache Tracker suite", func() {

var assertWatch = func(i *assertWatchInput) {
It("should create a running cache for the cluster", func() {
cache, ok := i.tracker.clusterCaches[i.clusterKey]
cache, ok := func() (*clusterCache, bool) {
i.tracker.clusterCachesLock.RLock()
defer i.tracker.clusterCachesLock.RUnlock()
cc, ok := i.tracker.clusterCaches[i.clusterKey]
return cc, ok
}()
Expect(ok).To(BeTrue())
stop := make(chan struct{})
Expect(cache.stopped).To(BeFalse())
Expect(cache.stop).ToNot(BeNil())
Expect(func() bool {
cache.lock.Lock()
defer cache.lock.Unlock()
return cache.stopped
}()).To(BeFalse())
Expect(func() chan struct{} {
cache.lock.Lock()
defer cache.lock.Unlock()
return cache.stop
}()).ToNot(BeNil())
// WaitForCacheSync returns false if Start was not called
Expect(cache.Cache.WaitForCacheSync(stop)).To(BeTrue())
})
Expand All @@ -70,7 +83,11 @@ var _ = Describe("ClusterCache Tracker suite", func() {
kind: i.kind,
eventHandler: i.eventHandler,
}
Expect(i.tracker.watches[i.clusterKey]).To(HaveKey(Equal(expectedInfo)))
Expect(func() map[watchInfo]struct{} {
i.tracker.watchesLock.RLock()
defer i.tracker.watchesLock.RUnlock()
return i.tracker.watches[i.clusterKey]
}()).To(HaveKey(Equal(expectedInfo)))
})

It("should call the Watcher with the correct values", func() {
Expand All @@ -83,7 +100,6 @@ var _ = Describe("ClusterCache Tracker suite", func() {

Expect(infos).To(ContainElement(
Equal(testWatcherInfo{
source: source.NewKindWithCache(i.kind, i.tracker.clusterCaches[i.clusterKey]),
handler: i.eventHandler,
predicates: i.predicates,
}),
Expand Down Expand Up @@ -318,14 +334,25 @@ var _ = Describe("ClusterCache Tracker suite", func() {
assertWatch(&differentClusterInput)

It("should have independent caches for each cluster", func() {
testClusterCache, ok := cct.clusterCaches[input.clusterKey]
testClusterCache, ok := func() (*clusterCache, bool) {
cct.clusterCachesLock.RLock()
defer cct.clusterCachesLock.RUnlock()
cc, ok := cct.clusterCaches[input.clusterKey]
return cc, ok
}()
Expect(ok).To(BeTrue())
differentClusterCache, ok := cct.clusterCaches[differentClusterInput.clusterKey]
differentClusterCache, ok := func() (*clusterCache, bool) {
cct.clusterCachesLock.RLock()
defer cct.clusterCachesLock.RUnlock()
cc, ok := cct.clusterCaches[differentClusterInput.clusterKey]
return cc, ok
}()
Expect(ok).To(BeTrue())
// Check stop channels are different, so that they can be stopped independently
Expect(testClusterCache.stop).ToNot(Equal(differentClusterCache.stop))
// Caches should also be different as they were constructed for different clusters
Expect(testClusterCache.Cache).ToNot(Equal(differentClusterCache.Cache))
// Check the memory locations to assert independence
Expect(fmt.Sprintf("%p", testClusterCache.Cache)).ToNot(Equal(fmt.Sprintf("%p", differentClusterCache.Cache)))
})
})
})
Expand All @@ -338,14 +365,12 @@ type testWatcher struct {
}

type testWatcherInfo struct {
source source.Source
handler handler.EventHandler
predicates []predicate.Predicate
}

func (t *testWatcher) Watch(s source.Source, h handler.EventHandler, ps ...predicate.Predicate) error {
t.watchInfo <- testWatcherInfo{
source: s,
handler: h,
predicates: ps,
}
Expand Down

0 comments on commit 859329b

Please sign in to comment.