From a3ca5760e8ac0832f3573d548b3a91c650d35327 Mon Sep 17 00:00:00 2001 From: Florian Gutmann Date: Fri, 1 Apr 2022 19:00:53 +0000 Subject: [PATCH] cluster-level lock for workload cluster client initialization Before this commit, workload cluster client initialization required a global lock to be held. If initialization of a single workload cluster client took time, all other reconcile-loops who require a workload cluster connection were blocked until initialization finished. Initialization of a workload cluster client can take a significant amount of time, because it requires to initialize the discovery client, which sends multiple request to the API-server. With this change initialization of a workload cluster client only requires to hold a lock for the specific cluster. This means reconciliation for other clusters is not affected by a long running workload cluster client initialization. --- controllers/remote/cluster_cache.go | 57 +++++++++++++++------ util/sync/mutex/keyedmutex.go | 77 +++++++++++++++++++++++++++++ util/sync/mutex/keyedmutex_test.go | 75 ++++++++++++++++++++++++++++ 3 files changed, 194 insertions(+), 15 deletions(-) create mode 100644 util/sync/mutex/keyedmutex.go create mode 100644 util/sync/mutex/keyedmutex_test.go diff --git a/controllers/remote/cluster_cache.go b/controllers/remote/cluster_cache.go index 92f141855e54..f16532dd5275 100644 --- a/controllers/remote/cluster_cache.go +++ b/controllers/remote/cluster_cache.go @@ -43,6 +43,7 @@ import ( clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/util/conditions" + "sigs.k8s.io/cluster-api/util/sync/mutex" ) const ( @@ -61,6 +62,7 @@ type ClusterCacheTracker struct { lock sync.RWMutex clusterAccessors map[client.ObjectKey]*clusterAccessor + clusterLock *mutex.KeyedMutex indexes []Index } @@ -102,16 +104,14 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt client: manager.GetClient(), scheme: manager.GetScheme(), clusterAccessors: make(map[client.ObjectKey]*clusterAccessor), + clusterLock: mutex.NewKeyedMutex(), indexes: options.Indexes, }, nil } // GetClient returns a cached client for the given cluster. func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) { - t.lock.Lock() - defer t.lock.Unlock() - - accessor, err := t.getClusterAccessorLH(ctx, cluster, t.indexes...) + accessor, err := t.getClusterAccessor(ctx, cluster, t.indexes...) if err != nil { return nil, err } @@ -135,21 +135,48 @@ func (t *ClusterCacheTracker) clusterAccessorExists(cluster client.ObjectKey) bo return exists } -// getClusterAccessorLH first tries to return an already-created clusterAccessor for cluster, falling back to creating a -// new clusterAccessor if needed. Note, this method requires t.lock to already be held (LH=lock held). -func (t *ClusterCacheTracker) getClusterAccessorLH(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) { - a := t.clusterAccessors[cluster] +// getClusterAccessor first tries to return an already-created clusterAccessor for cluster, falling back to creating a new clusterAccessor if needed. +func (t *ClusterCacheTracker) getClusterAccessor(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) { + log := ctrl.LoggerFrom(ctx, "cluster", cluster.Name) + + loadExistingAccessor := func() *clusterAccessor { + t.lock.RLock() + defer t.lock.RUnlock() + return t.clusterAccessors[cluster] + } + storeAccessor := func(a *clusterAccessor) { + t.lock.Lock() + defer t.lock.Unlock() + t.clusterAccessors[cluster] = a + } + + // if the accessor exists, return early + a := loadExistingAccessor() + if a != nil { + return a, nil + } + + // No cluster exists, we might need to initialize one. + // Lock on the cluster to ensure only one accessor is initialized for the cluster. + unlockCluster := t.clusterLock.Lock(cluster) + defer unlockCluster() + + // While we were waiting on the cluster lock, a different goroutine holding the lock might have initialized the accessor + // for this cluster successfully. If this is the case we return it. + a = loadExistingAccessor() if a != nil { return a, nil } + // We are the one who needs to initialize it. + log.V(4).Info("creating new cluster accessor") a, err := t.newClusterAccessor(ctx, cluster, indexes...) if err != nil { + log.V(4).Info("error creating new cluster accessor") return nil, errors.Wrap(err, "error creating client and cache for remote cluster") } - - t.clusterAccessors[cluster] = a - + log.V(4).Info("storing new cluster accessor") + storeAccessor(a) return a, nil } @@ -277,14 +304,14 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error return errors.New("input.Name is required") } - t.lock.Lock() - defer t.lock.Unlock() - - a, err := t.getClusterAccessorLH(ctx, input.Cluster, t.indexes...) + a, err := t.getClusterAccessor(ctx, input.Cluster, t.indexes...) if err != nil { return err } + unlock := t.clusterLock.Lock(input.Cluster) + defer unlock() + if a.watches.Has(input.Name) { t.log.V(6).Info("Watch already exists", "namespace", input.Cluster.Namespace, "cluster", input.Cluster.Name, "name", input.Name) return nil diff --git a/util/sync/mutex/keyedmutex.go b/util/sync/mutex/keyedmutex.go new file mode 100644 index 000000000000..18a22f405988 --- /dev/null +++ b/util/sync/mutex/keyedmutex.go @@ -0,0 +1,77 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package mutex implements utilities for mutexes. +package mutex + +import "sync" + +// KeyedMutex is a mutex locking on the key provided to the Lock function. +// Only one caller can hold the lock for a specific key at a time. +type KeyedMutex struct { + locksMtx sync.Mutex + locks map[interface{}]*keyLock +} + +// NewKeyedMutex creates a new keyed mutex ready for use. +func NewKeyedMutex() *KeyedMutex { + return &KeyedMutex{ + locksMtx: sync.Mutex{}, + locks: make(map[interface{}]*keyLock), + } +} + +// keyLock is the lock for a single specific key. +type keyLock struct { + sync.Mutex + // users is the number of callers attempting to acquire the mutex, including the one currently holding it. + users uint +} + +// Unlock unlocks a currently locked key. +type Unlock func() + +// Lock locks the passed in key, blocking if the key is locked. +// Returns the unlock function to release the lock on the key. +func (k *KeyedMutex) Lock(key interface{}) Unlock { + // Get an existing keyLock for the key or create a new one and increase the number of users. + l := func() *keyLock { + k.locksMtx.Lock() + defer k.locksMtx.Unlock() + + l, ok := k.locks[key] + if !ok { + l = &keyLock{} + k.locks[key] = l + } + l.users++ + return l + }() + + l.Lock() + + // Unlocks the keyLock for the key, decreases the counter and removes the keyLock from the map if there are no more users left. + return func() { + k.locksMtx.Lock() + defer k.locksMtx.Unlock() + + l.Unlock() + l.users-- + if l.users == 0 { + delete(k.locks, key) + } + } +} diff --git a/util/sync/mutex/keyedmutex_test.go b/util/sync/mutex/keyedmutex_test.go new file mode 100644 index 000000000000..027566b55094 --- /dev/null +++ b/util/sync/mutex/keyedmutex_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mutex + +import ( + "testing" + + . "github.com/onsi/gomega" +) + +func TestKeyedMutex(t *testing.T) { + t.Run("blocks on a locked key until unlocked", func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + routineStarted := make(chan bool) + routineCompleted := make(chan bool) + key := "key1" + + km := NewKeyedMutex() + unlock := km.Lock(key) + + // start a routine which tries to lock the same key + go func() { + routineStarted <- true + unlock := km.Lock(key) + unlock() + routineCompleted <- true + }() + + <-routineStarted + g.Consistently(routineCompleted).ShouldNot(Receive()) + + // routine should be able to acquire the lock for the key after we unlock + unlock() + g.Eventually(routineCompleted).Should(Receive()) + + // ensure that the lock was cleaned up from the internal map + g.Expect(km.locks).To(HaveLen(0)) + }) + + t.Run("can lock different keys without blocking", func(t *testing.T) { + g := NewWithT(t) + km := NewKeyedMutex() + keys := []string{"a", "b", "c", "d"} + unlocks := make([]Unlock, 0, len(keys)) + + // lock all keys + for _, key := range keys { + unlocks = append(unlocks, km.Lock(key)) + } + + // unlock all keys + for _, unlock := range unlocks { + unlock() + } + + // ensure that the lock was cleaned up from the internal map + g.Expect(km.locks).To(HaveLen(0)) + }) +}