Skip to content

Commit

Permalink
ClusterCacheTracker: non-blocking per-cluster locking
Browse files Browse the repository at this point in the history
Co-authored-by: Florian Gutmann <[email protected]>
  • Loading branch information
sbueringer and Florian Gutmann committed Nov 14, 2022
1 parent 808ca1c commit 1348144
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 49 deletions.
15 changes: 12 additions & 3 deletions controllers/remote/cluster_cache_healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ func TestClusterCacheHealthCheck(t *testing.T) {
})

// Make sure this passes for at least for some seconds, to give the health check goroutine time to run.
g.Consistently(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 5*time.Second, 1*time.Second).Should(BeTrue())
g.Consistently(func() bool {
_, ok := cct.loadAccessor(testClusterKey)
return ok
}, 5*time.Second, 1*time.Second).Should(BeTrue())
})

t.Run("with an invalid path", func(t *testing.T) {
Expand All @@ -162,7 +165,10 @@ func TestClusterCacheHealthCheck(t *testing.T) {
})

// This should succeed after N consecutive failed requests.
g.Eventually(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 5*time.Second, 1*time.Second).Should(BeFalse())
g.Eventually(func() bool {
_, ok := cct.loadAccessor(testClusterKey)
return ok
}, 5*time.Second, 1*time.Second).Should(BeFalse())
})

t.Run("with an invalid config", func(t *testing.T) {
Expand Down Expand Up @@ -193,7 +199,10 @@ func TestClusterCacheHealthCheck(t *testing.T) {
})

// This should succeed after N consecutive failed requests.
g.Eventually(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 5*time.Second, 1*time.Second).Should(BeFalse())
g.Eventually(func() bool {
_, ok := cct.loadAccessor(testClusterKey)
return ok
}, 5*time.Second, 1*time.Second).Should(BeFalse())
})
})
}
131 changes: 93 additions & 38 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,30 @@ const (
healthCheckPollInterval = 10 * time.Second
healthCheckRequestTimeout = 5 * time.Second
healthCheckUnhealthyThreshold = 10
initialCacheSyncTimeout = 5 * time.Minute
clusterCacheControllerName = "cluster-cache-tracker"
)

// ErrClusterLocked is returned in methods that require cluster-level locking
// if the cluster is already locked by another concurrent call.
var ErrClusterLocked = errors.New("cluster is locked already")

// ClusterCacheTracker manages client caches for workload clusters.
type ClusterCacheTracker struct {
log logr.Logger
clientUncachedObjects []client.Object
client client.Client
scheme *runtime.Scheme

lock sync.RWMutex
// clusterAccessorsLock is used to lock the access to the clusterAccessors map.
clusterAccessorsLock sync.RWMutex
// clusterAccessors is the map of clusterAccessors by cluster.
clusterAccessors map[client.ObjectKey]*clusterAccessor
indexes []Index
// clusterLock is a per-cluster lock used whenever we're locking for a specific cluster.
// E.g. for actions like creating a client or adding watches.
clusterLock *keyedMutex

indexes []Index

// controllerPodMetadata is the Pod metadata of the controller using this ClusterCacheTracker.
// This is only set when the POD_NAMESPACE, POD_NAME and POD_UID environment variables are set.
Expand Down Expand Up @@ -129,16 +140,14 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt
client: manager.GetClient(),
scheme: manager.GetScheme(),
clusterAccessors: make(map[client.ObjectKey]*clusterAccessor),
clusterLock: newKeyedMutex(),
indexes: options.Indexes,
}, nil
}

// GetClient returns a cached client for the given cluster.
func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) {
t.lock.Lock()
defer t.lock.Unlock()

accessor, err := t.getClusterAccessorLH(ctx, cluster, t.indexes...)
accessor, err := t.getClusterAccessor(ctx, cluster, t.indexes...)
if err != nil {
return nil, err
}
Expand All @@ -148,10 +157,7 @@ func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.Obje

// GetRESTConfig returns a cached REST config for the given cluster.
func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client.ObjectKey) (*rest.Config, error) {
t.lock.Lock()
defer t.lock.Unlock()

accessor, err := t.getClusterAccessorLH(ctc, cluster, t.indexes...)
accessor, err := t.getClusterAccessor(ctc, cluster, t.indexes...)
if err != nil {
return nil, err
}
Expand All @@ -169,29 +175,68 @@ type clusterAccessor struct {

// clusterAccessorExists returns true if a clusterAccessor exists for cluster.
func (t *ClusterCacheTracker) clusterAccessorExists(cluster client.ObjectKey) bool {
t.lock.RLock()
defer t.lock.RUnlock()
t.clusterAccessorsLock.RLock()
defer t.clusterAccessorsLock.RUnlock()

_, exists := t.clusterAccessors[cluster]
return exists
}

// getClusterAccessorLH first tries to return an already-created clusterAccessor for cluster, falling back to creating a
// new clusterAccessor if needed. Note, this method requires t.lock to already be held (LH=lock held).
func (t *ClusterCacheTracker) getClusterAccessorLH(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) {
a := t.clusterAccessors[cluster]
if a != nil {
return a, nil
// loadAccessor loads a clusterAccessor.
func (t *ClusterCacheTracker) loadAccessor(cluster client.ObjectKey) (*clusterAccessor, bool) {
t.clusterAccessorsLock.RLock()
defer t.clusterAccessorsLock.RUnlock()

accessor, ok := t.clusterAccessors[cluster]
return accessor, ok
}

// storeAccessor stores a clusterAccessor.
func (t *ClusterCacheTracker) storeAccessor(cluster client.ObjectKey, accessor *clusterAccessor) {
t.clusterAccessorsLock.Lock()
defer t.clusterAccessorsLock.Unlock()

t.clusterAccessors[cluster] = accessor
}

// getClusterAccessor returns a clusterAccessor for cluster.
// It first tries to return an already-created clusterAccessor.
// It then falls back to create a new clusterAccessor if needed.
// If there is already another go routine trying to create a clusterAccessor
// for the same cluster, an error is returned.
func (t *ClusterCacheTracker) getClusterAccessor(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) {
log := ctrl.LoggerFrom(ctx, "cluster", klog.KRef(cluster.Namespace, cluster.Name))

// If the clusterAccessor already exists, return early.
if accessor, ok := t.loadAccessor(cluster); ok {
return accessor, nil
}

a, err := t.newClusterAccessor(ctx, cluster, indexes...)
if err != nil {
return nil, errors.Wrap(err, "error creating client and cache for remote cluster")
// clusterAccessor doesn't exist yet, we might have to initialize one.
// Lock on the cluster to ensure only one clusterAccessor is initialized
// for the cluster at the same time.
// Return an error if another go routine already tries to create a clusterAccessor.
if ok := t.clusterLock.TryLock(cluster); !ok {
return nil, errors.Wrapf(ErrClusterLocked, "failed to create cluster accessor: failed to get lock for cluster")
}
defer t.clusterLock.Unlock(cluster)

t.clusterAccessors[cluster] = a
// Until we got the cluster lock a different goroutine might have initialized the clusterAccessor
// for this cluster successfully already. If this is the case we return it.
if accessor, ok := t.loadAccessor(cluster); ok {
return accessor, nil
}

// We are the go routine who has to initialize the clusterAccessor.
log.V(4).Info("Creating new cluster accessor")
accessor, err := t.newClusterAccessor(ctx, cluster, indexes...)
if err != nil {
return nil, errors.Wrap(err, "failed to create cluster accessor")
}

return a, nil
log.V(4).Info("Storing new cluster accessor")
t.storeAccessor(cluster, accessor)
return accessor, nil
}

// newClusterAccessor creates a new clusterAccessor.
Expand Down Expand Up @@ -265,7 +310,12 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl

// Start the cache!!!
go cache.Start(cacheCtx) //nolint:errcheck
if !cache.WaitForCacheSync(cacheCtx) {

// Wait until the cache is initially synced
cacheSyncCtx, cacheSyncCtxCancel := context.WithTimeout(ctx, initialCacheSyncTimeout)
defer cacheSyncCtxCancel()
if !cache.WaitForCacheSync(cacheSyncCtx) {
cache.Stop()
return nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
}

Expand Down Expand Up @@ -337,8 +387,8 @@ func (t *ClusterCacheTracker) createClient(config *rest.Config, cluster client.O

// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.
func (t *ClusterCacheTracker) deleteAccessor(_ context.Context, cluster client.ObjectKey) {
t.lock.Lock()
defer t.lock.Unlock()
t.clusterAccessorsLock.Lock()
defer t.clusterAccessorsLock.Unlock()

a, exists := t.clusterAccessors[cluster]
if !exists {
Expand Down Expand Up @@ -387,25 +437,30 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
return errors.New("input.Name is required")
}

t.lock.Lock()
defer t.lock.Unlock()

a, err := t.getClusterAccessorLH(ctx, input.Cluster, t.indexes...)
accessor, err := t.getClusterAccessor(ctx, input.Cluster, t.indexes...)
if err != nil {
return err
return errors.Wrapf(err, "failed to add %s watch on cluster %s", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
}

// We have to lock the cluster, so that the watch is not created multiple times in parallel.
ok := t.clusterLock.TryLock(input.Cluster)
if !ok {
return errors.Wrapf(ErrClusterLocked, "failed to add %s watch on cluster %s: failed to get lock for cluster", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
}
defer t.clusterLock.Unlock(input.Cluster)

if a.watches.Has(input.Name) {
t.log.V(6).Info("Watch already exists", "Cluster", klog.KRef(input.Cluster.Namespace, input.Cluster.Name), "name", input.Name)
if accessor.watches.Has(input.Name) {
log := ctrl.LoggerFrom(ctx)
log.V(6).Info("Watch already exists", "Cluster", klog.KRef(input.Cluster.Namespace, input.Cluster.Name), "name", input.Name)
return nil
}

// Need to create the watch
if err := input.Watcher.Watch(source.NewKindWithCache(input.Kind, a.cache), input.EventHandler, input.Predicates...); err != nil {
return errors.Wrap(err, "error creating watch")
if err := input.Watcher.Watch(source.NewKindWithCache(input.Kind, accessor.cache), input.EventHandler, input.Predicates...); err != nil {
return errors.Wrapf(err, "failed to add %s watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
}

a.watches.Insert(input.Name)
accessor.watches.Insert(input.Name)

return nil
}
Expand Down Expand Up @@ -472,7 +527,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health
return false, nil
}

if !t.clusterAccessorExists(in.cluster) {
if _, ok := t.loadAccessor(in.cluster); !ok {
// Cache for this cluster has already been cleaned up.
// Nothing to do, so return true.
return true, nil
Expand Down Expand Up @@ -505,7 +560,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health
// An error returned implies the health check has failed a sufficient number of
// times for the cluster to be considered unhealthy
// NB. we are ignoring ErrWaitTimeout because this error happens when the channel is close, that in this case
// happens when the cache is explicitly stopped.F
// happens when the cache is explicitly stopped.
if err != nil && err != wait.ErrWaitTimeout {
t.log.Error(err, "Error health checking cluster", "Cluster", klog.KRef(in.cluster.Namespace, in.cluster.Name))
t.deleteAccessor(ctx, in.cluster)
Expand Down
66 changes: 66 additions & 0 deletions controllers/remote/keyedmutex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package remote

import (
"sync"

"sigs.k8s.io/controller-runtime/pkg/client"
)

// keyedMutex is a mutex locking on the key provided to the Lock function.
// Only one caller can hold the lock for a specific key at a time.
// A second Lock call if the lock is already held for a key returns false.
type keyedMutex struct {
locksMtx sync.Mutex
locks map[client.ObjectKey]struct{}
}

// newKeyedMutex creates a new keyed mutex ready for use.
func newKeyedMutex() *keyedMutex {
return &keyedMutex{
locks: make(map[client.ObjectKey]struct{}),
}
}

// TryLock locks the passed in key if it's not already locked.
// A second Lock call if the lock is already held for a key returns false.
// In the ClusterCacheTracker case the key is the ObjectKey for a cluster.
func (k *keyedMutex) TryLock(key client.ObjectKey) bool {
k.locksMtx.Lock()
defer k.locksMtx.Unlock()

// Check if there is already a lock for this key (e.g. Cluster).
if _, ok := k.locks[key]; ok {
// There is already a lock, return false.
return false
}

// Lock doesn't exist yet, create the lock.
k.locks[key] = struct{}{}

return true
}

// Unlock unlocks the key.
func (k *keyedMutex) Unlock(key client.ObjectKey) {
k.locksMtx.Lock()
defer k.locksMtx.Unlock()

// Remove the lock if it exists.
delete(k.locks, key)
}
Loading

0 comments on commit 1348144

Please sign in to comment.