From 37a60e5909d036367efcacd408cf22f286618d16 Mon Sep 17 00:00:00 2001 From: Stefan Bueringer Date: Fri, 11 Nov 2022 14:22:13 +0100 Subject: [PATCH] ClusterCacheTracker: non-blocking per-cluster locking Co-authored-by: Florian Gutmann --- controllers/remote/cluster_cache_tracker.go | 102 ++++++++++++++------ controllers/remote/keyedmutex.go | 78 +++++++++++++++ controllers/remote/keyedmutex_test.go | 89 +++++++++++++++++ go.mod | 5 +- go.sum | 2 + 5 files changed, 247 insertions(+), 29 deletions(-) create mode 100644 controllers/remote/keyedmutex.go create mode 100644 controllers/remote/keyedmutex_test.go diff --git a/controllers/remote/cluster_cache_tracker.go b/controllers/remote/cluster_cache_tracker.go index 5da6adf1055c..db1ecab04a41 100644 --- a/controllers/remote/cluster_cache_tracker.go +++ b/controllers/remote/cluster_cache_tracker.go @@ -54,6 +54,7 @@ const ( healthCheckPollInterval = 10 * time.Second healthCheckRequestTimeout = 5 * time.Second healthCheckUnhealthyThreshold = 10 + initialCacheSyncTimeout = 5 * time.Minute clusterCacheControllerName = "cluster-cache-tracker" ) @@ -64,9 +65,15 @@ type ClusterCacheTracker struct { client client.Client scheme *runtime.Scheme - lock sync.RWMutex + // clusterAccessorsLock is used to lock the access to the clusterAccessors map. + clusterAccessorsLock sync.RWMutex + // clusterAccessors is the map of clusterAccessor by cluster. clusterAccessors map[client.ObjectKey]*clusterAccessor - indexes []Index + // clusterLock is a per-cluster lock used whenever we lock per-cluster actions + // like creating a client or adding watches. + clusterLock *keyedMutex + + indexes []Index // controllerPodMetadata is the Pod metadata of the controller using this ClusterCacheTracker. // This is only set when the POD_NAMESPACE, POD_NAME and POD_UID environment variables are set. @@ -129,16 +136,14 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt client: manager.GetClient(), scheme: manager.GetScheme(), clusterAccessors: make(map[client.ObjectKey]*clusterAccessor), + clusterLock: newKeyedMutex(), indexes: options.Indexes, }, nil } // GetClient returns a cached client for the given cluster. func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) { - t.lock.Lock() - defer t.lock.Unlock() - - accessor, err := t.getClusterAccessorLH(ctx, cluster, t.indexes...) + accessor, err := t.getClusterAccessor(ctx, cluster, t.indexes...) if err != nil { return nil, err } @@ -148,10 +153,7 @@ func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.Obje // GetRESTConfig returns a cached REST config for the given cluster. func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client.ObjectKey) (*rest.Config, error) { - t.lock.Lock() - defer t.lock.Unlock() - - accessor, err := t.getClusterAccessorLH(ctc, cluster, t.indexes...) + accessor, err := t.getClusterAccessor(ctc, cluster, t.indexes...) if err != nil { return nil, err } @@ -169,28 +171,63 @@ type clusterAccessor struct { // clusterAccessorExists returns true if a clusterAccessor exists for cluster. func (t *ClusterCacheTracker) clusterAccessorExists(cluster client.ObjectKey) bool { - t.lock.RLock() - defer t.lock.RUnlock() + t.clusterAccessorsLock.RLock() + defer t.clusterAccessorsLock.RUnlock() _, exists := t.clusterAccessors[cluster] return exists } -// getClusterAccessorLH first tries to return an already-created clusterAccessor for cluster, falling back to creating a -// new clusterAccessor if needed. Note, this method requires t.lock to already be held (LH=lock held). -func (t *ClusterCacheTracker) getClusterAccessorLH(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) { - a := t.clusterAccessors[cluster] +// getClusterAccessor returns a clusterAccessor for cluster. +// It first tries to return an already-created clusterAccessor. +// It then falls back to create a new clusterAccessor if needed. +// If there is already another go routine trying to create a clusterAccessor +// for the same cluster, an error is returned. +func (t *ClusterCacheTracker) getClusterAccessor(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) { + log := ctrl.LoggerFrom(ctx, "cluster", klog.KRef(cluster.Namespace, cluster.Name)) + + loadExistingAccessor := func() *clusterAccessor { + t.clusterAccessorsLock.RLock() + defer t.clusterAccessorsLock.RUnlock() + return t.clusterAccessors[cluster] + } + storeAccessor := func(a *clusterAccessor) { + t.clusterAccessorsLock.Lock() + defer t.clusterAccessorsLock.Unlock() + t.clusterAccessors[cluster] = a + } + + // If the clusterAccessor already exists, return early. + a := loadExistingAccessor() if a != nil { return a, nil } - a, err := t.newClusterAccessor(ctx, cluster, indexes...) - if err != nil { - return nil, errors.Wrap(err, "error creating client and cache for remote cluster") + // clusterAccessor doesn't exist yet, we might have to initialize one. + // Lock on the cluster to ensure only one clusterAccessor is initialized + // for the cluster at the same time. + // Return an error if another go routine already tries to create a clusterAccessor. + unlockCluster, ok := t.clusterLock.TryLock(cluster) + if !ok { + return nil, errors.Errorf("error creating new cluster accessor: another go routine is already trying to create the cluster accessor for this cluster") } + defer unlockCluster() - t.clusterAccessors[cluster] = a + // Until we got the cluster lock a different goroutine might have initialized the clusterAccessor + // for this cluster successfully already. If this is the case we return it. + a = loadExistingAccessor() + if a != nil { + return a, nil + } + // We are the go routine who has to initialize the clusterAccessor. + log.V(4).Info("Creating new cluster accessor") + a, err := t.newClusterAccessor(ctx, cluster, indexes...) + if err != nil { + return nil, errors.Wrap(err, "error creating new cluster accessor") + } + log.V(4).Info("Storing new cluster accessor") + storeAccessor(a) return a, nil } @@ -265,7 +302,12 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl // Start the cache!!! go cache.Start(cacheCtx) //nolint:errcheck - if !cache.WaitForCacheSync(cacheCtx) { + + // Wait until the cache is initially synced + cacheSyncCtx, cacheSyncCtxCancel := context.WithTimeout(ctx, initialCacheSyncTimeout) + defer cacheSyncCtxCancel() + if !cache.WaitForCacheSync(cacheSyncCtx) { + cache.Stop() return nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err()) } @@ -337,8 +379,8 @@ func (t *ClusterCacheTracker) createClient(config *rest.Config, cluster client.O // deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker. func (t *ClusterCacheTracker) deleteAccessor(_ context.Context, cluster client.ObjectKey) { - t.lock.Lock() - defer t.lock.Unlock() + t.clusterAccessorsLock.Lock() + defer t.clusterAccessorsLock.Unlock() a, exists := t.clusterAccessors[cluster] if !exists { @@ -387,14 +429,18 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error return errors.New("input.Name is required") } - t.lock.Lock() - defer t.lock.Unlock() - - a, err := t.getClusterAccessorLH(ctx, input.Cluster, t.indexes...) + a, err := t.getClusterAccessor(ctx, input.Cluster, t.indexes...) if err != nil { return err } + // We have to lock the cluster, so that the watch is not created multiple times in parallel. + unlock, ok := t.clusterLock.TryLock(input.Cluster) + if !ok { + return errors.Errorf("failed to add watch: another go routine is already trying to create the cluster accessor") + } + defer unlock() + if a.watches.Has(input.Name) { t.log.V(6).Info("Watch already exists", "Cluster", klog.KRef(input.Cluster.Namespace, input.Cluster.Name), "name", input.Name) return nil @@ -505,7 +551,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health // An error returned implies the health check has failed a sufficient number of // times for the cluster to be considered unhealthy // NB. we are ignoring ErrWaitTimeout because this error happens when the channel is close, that in this case - // happens when the cache is explicitly stopped.F + // happens when the cache is explicitly stopped. if err != nil && err != wait.ErrWaitTimeout { t.log.Error(err, "Error health checking cluster", "Cluster", klog.KRef(in.cluster.Namespace, in.cluster.Name)) t.deleteAccessor(ctx, in.cluster) diff --git a/controllers/remote/keyedmutex.go b/controllers/remote/keyedmutex.go new file mode 100644 index 000000000000..528c3573de44 --- /dev/null +++ b/controllers/remote/keyedmutex.go @@ -0,0 +1,78 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import "sync" + +// keyedMutex is a mutex locking on the key provided to the Lock function. +// Only one caller can hold the lock for a specific key at a time. +// A second Lock call if the lock is already held for a key returns false. +type keyedMutex struct { + locksMtx sync.Mutex + locks map[interface{}]*sync.Mutex +} + +// newKeyedMutex creates a new keyed mutex ready for use. +func newKeyedMutex() *keyedMutex { + return &keyedMutex{ + locks: make(map[interface{}]*sync.Mutex), + } +} + +// unlock unlocks a currently locked key. +type unlock func() + +// TryLock locks the passed in key if it's not already locked. +// Returns the unlock function to release the lock on the key. +// A second Lock call if the lock is already held for a key returns false. +// In the ClusterCacheTracker case the key is the ObjectKey for a cluster. +func (k *keyedMutex) TryLock(key interface{}) (unlock, bool) { + // Get the lock if it doesn't exist already. + // If it does exist, return false. + l, ok := func() (*sync.Mutex, bool) { + k.locksMtx.Lock() + defer k.locksMtx.Unlock() + + l, ok := k.locks[key] + if !ok { + // Lock doesn't exist yet, create one and return it. + l = &sync.Mutex{} + k.locks[key] = l + return l, true + } + + // Lock already exists, return false. + return nil, false + }() + + // Return false if another go routine already holds the lock for this key (e.g. Cluster). + if !ok { + return nil, false + } + + // Lock for the current key (e.g. Cluster). + l.Lock() + + // Unlock the key (e.g. Cluster) and remove it from the lock map. + return func() { + k.locksMtx.Lock() + defer k.locksMtx.Unlock() + + l.Unlock() + delete(k.locks, key) + }, true +} diff --git a/controllers/remote/keyedmutex_test.go b/controllers/remote/keyedmutex_test.go new file mode 100644 index 000000000000..cc4a6d57b538 --- /dev/null +++ b/controllers/remote/keyedmutex_test.go @@ -0,0 +1,89 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import ( + "testing" + + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func TestKeyedMutex(t *testing.T) { + t.Run("Lock a Cluster and ensures the second Lock on the same Cluster returns false", func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + cluster1 := client.ObjectKey{Namespace: metav1.NamespaceDefault, Name: "Cluster1"} + km := newKeyedMutex() + + // Try to lock cluster1. + // Should work as nobody currently holds the lock for cluster1. + unlock, ok := km.TryLock(cluster1) + g.Expect(ok).To(BeTrue()) + + // Try to lock cluster1 again. + // Shouldn't work as cluster1 is already locked. + _, ok = km.TryLock(cluster1) + g.Expect(ok).To(BeFalse()) + + // Unlock cluster1. + unlock() + + // Ensure that the lock was cleaned up from the internal map. + g.Expect(km.locks).To(HaveLen(0)) + }) + + t.Run("Can lock different Clusters in parallel but each one only once", func(t *testing.T) { + g := NewWithT(t) + km := newKeyedMutex() + clusters := []client.ObjectKey{ + {Namespace: metav1.NamespaceDefault, Name: "Cluster1"}, + {Namespace: metav1.NamespaceDefault, Name: "Cluster2"}, + {Namespace: metav1.NamespaceDefault, Name: "Cluster3"}, + {Namespace: metav1.NamespaceDefault, Name: "Cluster4"}, + } + + // Run this twice to ensure Clusters can be locked again + // after they have been unlocked. + for i := 0; i < 2; i++ { + unlocks := make([]unlock, 0, len(clusters)) + + // Lock all Clusters (should work). + for _, key := range clusters { + unlock, ok := km.TryLock(key) + g.Expect(ok).To(BeTrue()) + unlocks = append(unlocks, unlock) + } + + // Ensure Clusters can't be locked again. + for _, key := range clusters { + _, ok := km.TryLock(key) + g.Expect(ok).To(BeFalse()) + } + + // Unlock all Clusters. + for _, unlock := range unlocks { + unlock() + } + } + + // Ensure that the lock was cleaned up from the internal map. + g.Expect(km.locks).To(HaveLen(0)) + }) +} diff --git a/go.mod b/go.mod index b1c748ceb755..789a20bb91bb 100644 --- a/go.mod +++ b/go.mod @@ -111,7 +111,7 @@ require ( github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/prometheus/client_golang v1.13.0 github.com/prometheus/client_model v0.2.0 // indirect - github.com/prometheus/common v0.37.0 // indirect + github.com/prometheus/common v0.37.0 github.com/prometheus/procfs v0.8.0 // indirect github.com/rivo/uniseg v0.4.2 // indirect github.com/russross/blackfriday v1.5.2 // indirect @@ -150,6 +150,8 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect ) +require github.com/stretchr/testify v1.8.0 + require ( cloud.google.com/go/compute v1.7.0 // indirect github.com/blang/semver/v4 v4.0.0 // indirect @@ -158,4 +160,5 @@ require ( github.com/google/gnostic v0.6.9 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pelletier/go-toml/v2 v2.0.5 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect ) diff --git a/go.sum b/go.sum index 370899b4d4c5..d0adcc042592 100644 --- a/go.sum +++ b/go.sum @@ -394,6 +394,7 @@ github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -478,6 +479,7 @@ github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00/go.mod github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=