diff --git a/controllers/remote/cluster_cache_healthcheck_test.go b/controllers/remote/cluster_cache_healthcheck_test.go index 7e46428e9bff..f4e3e5a9e948 100644 --- a/controllers/remote/cluster_cache_healthcheck_test.go +++ b/controllers/remote/cluster_cache_healthcheck_test.go @@ -140,7 +140,10 @@ func TestClusterCacheHealthCheck(t *testing.T) { }) // Make sure this passes for at least for some seconds, to give the health check goroutine time to run. - g.Consistently(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 5*time.Second, 1*time.Second).Should(BeTrue()) + g.Consistently(func() bool { + _, ok := cct.loadAccessor(testClusterKey) + return ok + }, 5*time.Second, 1*time.Second).Should(BeTrue()) }) t.Run("with an invalid path", func(t *testing.T) { @@ -162,7 +165,10 @@ func TestClusterCacheHealthCheck(t *testing.T) { }) // This should succeed after N consecutive failed requests. - g.Eventually(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 5*time.Second, 1*time.Second).Should(BeFalse()) + g.Eventually(func() bool { + _, ok := cct.loadAccessor(testClusterKey) + return ok + }, 5*time.Second, 1*time.Second).Should(BeFalse()) }) t.Run("with an invalid config", func(t *testing.T) { @@ -193,7 +199,10 @@ func TestClusterCacheHealthCheck(t *testing.T) { }) // This should succeed after N consecutive failed requests. - g.Eventually(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 5*time.Second, 1*time.Second).Should(BeFalse()) + g.Eventually(func() bool { + _, ok := cct.loadAccessor(testClusterKey) + return ok + }, 5*time.Second, 1*time.Second).Should(BeFalse()) }) }) } diff --git a/controllers/remote/cluster_cache_tracker.go b/controllers/remote/cluster_cache_tracker.go index 5da6adf1055c..020dd3807e03 100644 --- a/controllers/remote/cluster_cache_tracker.go +++ b/controllers/remote/cluster_cache_tracker.go @@ -54,9 +54,14 @@ const ( healthCheckPollInterval = 10 * time.Second healthCheckRequestTimeout = 5 * time.Second healthCheckUnhealthyThreshold = 10 + initialCacheSyncTimeout = 5 * time.Minute clusterCacheControllerName = "cluster-cache-tracker" ) +// ErrClusterLocked is returned in methods that require cluster-level locking +// if the cluster is already locked by another concurrent call. +var ErrClusterLocked = errors.New("cluster is locked already") + // ClusterCacheTracker manages client caches for workload clusters. type ClusterCacheTracker struct { log logr.Logger @@ -64,9 +69,15 @@ type ClusterCacheTracker struct { client client.Client scheme *runtime.Scheme - lock sync.RWMutex + // clusterAccessorsLock is used to lock the access to the clusterAccessors map. + clusterAccessorsLock sync.RWMutex + // clusterAccessors is the map of clusterAccessors by cluster. clusterAccessors map[client.ObjectKey]*clusterAccessor - indexes []Index + // clusterLock is a per-cluster lock used whenever we're locking for a specific cluster. + // E.g. for actions like creating a client or adding watches. + clusterLock *keyedMutex + + indexes []Index // controllerPodMetadata is the Pod metadata of the controller using this ClusterCacheTracker. // This is only set when the POD_NAMESPACE, POD_NAME and POD_UID environment variables are set. @@ -129,16 +140,14 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt client: manager.GetClient(), scheme: manager.GetScheme(), clusterAccessors: make(map[client.ObjectKey]*clusterAccessor), + clusterLock: newKeyedMutex(), indexes: options.Indexes, }, nil } // GetClient returns a cached client for the given cluster. func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) { - t.lock.Lock() - defer t.lock.Unlock() - - accessor, err := t.getClusterAccessorLH(ctx, cluster, t.indexes...) + accessor, err := t.getClusterAccessor(ctx, cluster, t.indexes...) if err != nil { return nil, err } @@ -148,10 +157,7 @@ func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.Obje // GetRESTConfig returns a cached REST config for the given cluster. func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client.ObjectKey) (*rest.Config, error) { - t.lock.Lock() - defer t.lock.Unlock() - - accessor, err := t.getClusterAccessorLH(ctc, cluster, t.indexes...) + accessor, err := t.getClusterAccessor(ctc, cluster, t.indexes...) if err != nil { return nil, err } @@ -169,29 +175,68 @@ type clusterAccessor struct { // clusterAccessorExists returns true if a clusterAccessor exists for cluster. func (t *ClusterCacheTracker) clusterAccessorExists(cluster client.ObjectKey) bool { - t.lock.RLock() - defer t.lock.RUnlock() + t.clusterAccessorsLock.RLock() + defer t.clusterAccessorsLock.RUnlock() _, exists := t.clusterAccessors[cluster] return exists } -// getClusterAccessorLH first tries to return an already-created clusterAccessor for cluster, falling back to creating a -// new clusterAccessor if needed. Note, this method requires t.lock to already be held (LH=lock held). -func (t *ClusterCacheTracker) getClusterAccessorLH(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) { - a := t.clusterAccessors[cluster] - if a != nil { - return a, nil +// loadAccessor loads a clusterAccessor. +func (t *ClusterCacheTracker) loadAccessor(cluster client.ObjectKey) (*clusterAccessor, bool) { + t.clusterAccessorsLock.RLock() + defer t.clusterAccessorsLock.RUnlock() + + accessor, ok := t.clusterAccessors[cluster] + return accessor, ok +} + +// storeAccessor stores a clusterAccessor. +func (t *ClusterCacheTracker) storeAccessor(cluster client.ObjectKey, accessor *clusterAccessor) { + t.clusterAccessorsLock.Lock() + defer t.clusterAccessorsLock.Unlock() + + t.clusterAccessors[cluster] = accessor +} + +// getClusterAccessor returns a clusterAccessor for cluster. +// It first tries to return an already-created clusterAccessor. +// It then falls back to create a new clusterAccessor if needed. +// If there is already another go routine trying to create a clusterAccessor +// for the same cluster, an error is returned. +func (t *ClusterCacheTracker) getClusterAccessor(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) { + log := ctrl.LoggerFrom(ctx, "cluster", klog.KRef(cluster.Namespace, cluster.Name)) + + // If the clusterAccessor already exists, return early. + if accessor, ok := t.loadAccessor(cluster); ok { + return accessor, nil } - a, err := t.newClusterAccessor(ctx, cluster, indexes...) - if err != nil { - return nil, errors.Wrap(err, "error creating client and cache for remote cluster") + // clusterAccessor doesn't exist yet, we might have to initialize one. + // Lock on the cluster to ensure only one clusterAccessor is initialized + // for the cluster at the same time. + // Return an error if another go routine already tries to create a clusterAccessor. + if ok := t.clusterLock.TryLock(cluster); !ok { + return nil, errors.Wrapf(ErrClusterLocked, "failed to create cluster accessor: failed to get lock for cluster") } + defer t.clusterLock.Unlock(cluster) - t.clusterAccessors[cluster] = a + // Until we got the cluster lock a different goroutine might have initialized the clusterAccessor + // for this cluster successfully already. If this is the case we return it. + if accessor, ok := t.loadAccessor(cluster); ok { + return accessor, nil + } + + // We are the go routine who has to initialize the clusterAccessor. + log.V(4).Info("Creating new cluster accessor") + accessor, err := t.newClusterAccessor(ctx, cluster, indexes...) + if err != nil { + return nil, errors.Wrap(err, "failed to create cluster accessor") + } - return a, nil + log.V(4).Info("Storing new cluster accessor") + t.storeAccessor(cluster, accessor) + return accessor, nil } // newClusterAccessor creates a new clusterAccessor. @@ -265,7 +310,12 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl // Start the cache!!! go cache.Start(cacheCtx) //nolint:errcheck - if !cache.WaitForCacheSync(cacheCtx) { + + // Wait until the cache is initially synced + cacheSyncCtx, cacheSyncCtxCancel := context.WithTimeout(ctx, initialCacheSyncTimeout) + defer cacheSyncCtxCancel() + if !cache.WaitForCacheSync(cacheSyncCtx) { + cache.Stop() return nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err()) } @@ -337,8 +387,8 @@ func (t *ClusterCacheTracker) createClient(config *rest.Config, cluster client.O // deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker. func (t *ClusterCacheTracker) deleteAccessor(_ context.Context, cluster client.ObjectKey) { - t.lock.Lock() - defer t.lock.Unlock() + t.clusterAccessorsLock.Lock() + defer t.clusterAccessorsLock.Unlock() a, exists := t.clusterAccessors[cluster] if !exists { @@ -387,25 +437,30 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error return errors.New("input.Name is required") } - t.lock.Lock() - defer t.lock.Unlock() - - a, err := t.getClusterAccessorLH(ctx, input.Cluster, t.indexes...) + accessor, err := t.getClusterAccessor(ctx, input.Cluster, t.indexes...) if err != nil { - return err + return errors.Wrapf(err, "failed to add %s watch on cluster %s", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name)) + } + + // We have to lock the cluster, so that the watch is not created multiple times in parallel. + ok := t.clusterLock.TryLock(input.Cluster) + if !ok { + return errors.Wrapf(ErrClusterLocked, "failed to add %s watch on cluster %s: failed to get lock for cluster", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name)) } + defer t.clusterLock.Unlock(input.Cluster) - if a.watches.Has(input.Name) { - t.log.V(6).Info("Watch already exists", "Cluster", klog.KRef(input.Cluster.Namespace, input.Cluster.Name), "name", input.Name) + if accessor.watches.Has(input.Name) { + log := ctrl.LoggerFrom(ctx) + log.V(6).Info("Watch already exists", "Cluster", klog.KRef(input.Cluster.Namespace, input.Cluster.Name), "name", input.Name) return nil } // Need to create the watch - if err := input.Watcher.Watch(source.NewKindWithCache(input.Kind, a.cache), input.EventHandler, input.Predicates...); err != nil { - return errors.Wrap(err, "error creating watch") + if err := input.Watcher.Watch(source.NewKindWithCache(input.Kind, accessor.cache), input.EventHandler, input.Predicates...); err != nil { + return errors.Wrapf(err, "failed to add %s watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name)) } - a.watches.Insert(input.Name) + accessor.watches.Insert(input.Name) return nil } @@ -472,7 +527,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health return false, nil } - if !t.clusterAccessorExists(in.cluster) { + if _, ok := t.loadAccessor(in.cluster); !ok { // Cache for this cluster has already been cleaned up. // Nothing to do, so return true. return true, nil @@ -505,7 +560,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health // An error returned implies the health check has failed a sufficient number of // times for the cluster to be considered unhealthy // NB. we are ignoring ErrWaitTimeout because this error happens when the channel is close, that in this case - // happens when the cache is explicitly stopped.F + // happens when the cache is explicitly stopped. if err != nil && err != wait.ErrWaitTimeout { t.log.Error(err, "Error health checking cluster", "Cluster", klog.KRef(in.cluster.Namespace, in.cluster.Name)) t.deleteAccessor(ctx, in.cluster) diff --git a/controllers/remote/keyedmutex.go b/controllers/remote/keyedmutex.go new file mode 100644 index 000000000000..58dbd86d54c6 --- /dev/null +++ b/controllers/remote/keyedmutex.go @@ -0,0 +1,66 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import ( + "sync" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// keyedMutex is a mutex locking on the key provided to the Lock function. +// Only one caller can hold the lock for a specific key at a time. +// A second Lock call if the lock is already held for a key returns false. +type keyedMutex struct { + locksMtx sync.Mutex + locks map[client.ObjectKey]struct{} +} + +// newKeyedMutex creates a new keyed mutex ready for use. +func newKeyedMutex() *keyedMutex { + return &keyedMutex{ + locks: make(map[client.ObjectKey]struct{}), + } +} + +// TryLock locks the passed in key if it's not already locked. +// A second Lock call if the lock is already held for a key returns false. +// In the ClusterCacheTracker case the key is the ObjectKey for a cluster. +func (k *keyedMutex) TryLock(key client.ObjectKey) bool { + k.locksMtx.Lock() + defer k.locksMtx.Unlock() + + // Check if there is already a lock for this key (e.g. Cluster). + if _, ok := k.locks[key]; ok { + // There is already a lock, return false. + return false + } + + // Lock doesn't exist yet, create the lock. + k.locks[key] = struct{}{} + + return true +} + +// Unlock unlocks the key. +func (k *keyedMutex) Unlock(key client.ObjectKey) { + k.locksMtx.Lock() + defer k.locksMtx.Unlock() + + // Remove the lock if it exists. + delete(k.locks, key) +} diff --git a/controllers/remote/keyedmutex_test.go b/controllers/remote/keyedmutex_test.go new file mode 100644 index 000000000000..c982bf093f9b --- /dev/null +++ b/controllers/remote/keyedmutex_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import ( + "testing" + + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func TestKeyedMutex(t *testing.T) { + t.Run("Lock a Cluster and ensures the second Lock on the same Cluster returns false", func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + cluster1 := client.ObjectKey{Namespace: metav1.NamespaceDefault, Name: "Cluster1"} + km := newKeyedMutex() + + // Try to lock cluster1. + // Should work as nobody currently holds the lock for cluster1. + g.Expect(km.TryLock(cluster1)).To(BeTrue()) + + // Try to lock cluster1 again. + // Shouldn't work as cluster1 is already locked. + g.Expect(km.TryLock(cluster1)).To(BeFalse()) + + // Unlock cluster1. + km.Unlock(cluster1) + + // Ensure that the lock was cleaned up from the internal map. + g.Expect(km.locks).To(HaveLen(0)) + }) + + t.Run("Can lock different Clusters in parallel but each one only once", func(t *testing.T) { + g := NewWithT(t) + km := newKeyedMutex() + clusters := []client.ObjectKey{ + {Namespace: metav1.NamespaceDefault, Name: "Cluster1"}, + {Namespace: metav1.NamespaceDefault, Name: "Cluster2"}, + {Namespace: metav1.NamespaceDefault, Name: "Cluster3"}, + {Namespace: metav1.NamespaceDefault, Name: "Cluster4"}, + } + + // Run this twice to ensure Clusters can be locked again + // after they have been unlocked. + for i := 0; i < 2; i++ { + // Lock all Clusters (should work). + for _, key := range clusters { + g.Expect(km.TryLock(key)).To(BeTrue()) + } + + // Ensure Clusters can't be locked again. + for _, key := range clusters { + g.Expect(km.TryLock(key)).To(BeFalse()) + } + + // Unlock all Clusters. + for _, key := range clusters { + km.Unlock(key) + } + } + + // Ensure that the lock was cleaned up from the internal map. + g.Expect(km.locks).To(HaveLen(0)) + }) +} diff --git a/controlplane/kubeadm/internal/controllers/controller.go b/controlplane/kubeadm/internal/controllers/controller.go index a2cb30e99109..a93fda08e739 100644 --- a/controlplane/kubeadm/internal/controllers/controller.go +++ b/controlplane/kubeadm/internal/controllers/controller.go @@ -204,11 +204,25 @@ func (r *KubeadmControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl. if !kcp.ObjectMeta.DeletionTimestamp.IsZero() { // Handle deletion reconciliation loop. - return r.reconcileDelete(ctx, cluster, kcp) + res, err = r.reconcileDelete(ctx, cluster, kcp) + // Requeue if the reconcile failed because the ClusterCacheTracker was locked for + // the current cluster because of concurrent access. + if errors.Is(err, remote.ErrClusterLocked) { + log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } + return res, err } // Handle normal reconciliation loop. - return r.reconcile(ctx, cluster, kcp) + res, err = r.reconcile(ctx, cluster, kcp) + // Requeue if the reconcile failed because the ClusterCacheTracker was locked for + // the current cluster because of concurrent access. + if errors.Is(err, remote.ErrClusterLocked) { + log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } + return res, err } func patchKubeadmControlPlane(ctx context.Context, patchHelper *patch.Helper, kcp *controlplanev1.KubeadmControlPlane) error { diff --git a/exp/addons/internal/controllers/clusterresourceset_controller.go b/exp/addons/internal/controllers/clusterresourceset_controller.go index 0fc74041512a..a2bf532b0e42 100644 --- a/exp/addons/internal/controllers/clusterresourceset_controller.go +++ b/exp/addons/internal/controllers/clusterresourceset_controller.go @@ -152,6 +152,12 @@ func (r *ClusterResourceSetReconciler) Reconcile(ctx context.Context, req ctrl.R for _, cluster := range clusters { if err := r.ApplyClusterResourceSet(ctx, cluster, clusterResourceSet); err != nil { + // Requeue if the reconcile failed because the ClusterCacheTracker was locked for + // the current cluster because of concurrent access. + if errors.Is(err, remote.ErrClusterLocked) { + log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } return ctrl.Result{}, err } } diff --git a/internal/controllers/machine/machine_controller.go b/internal/controllers/machine/machine_controller.go index 4919e8935309..bfdddefeccf9 100644 --- a/internal/controllers/machine/machine_controller.go +++ b/internal/controllers/machine/machine_controller.go @@ -207,11 +207,25 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re // Handle deletion reconciliation loop. if !m.ObjectMeta.DeletionTimestamp.IsZero() { - return r.reconcileDelete(ctx, cluster, m) + res, err := r.reconcileDelete(ctx, cluster, m) + // Requeue if the reconcile failed because the ClusterCacheTracker was locked for + // the current cluster because of concurrent access. + if errors.Is(err, remote.ErrClusterLocked) { + log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } + return res, err } // Handle normal reconciliation loop. - return r.reconcile(ctx, cluster, m) + res, err := r.reconcile(ctx, cluster, m) + // Requeue if the reconcile failed because the ClusterCacheTracker was locked for + // the current cluster because of concurrent access. + if errors.Is(err, remote.ErrClusterLocked) { + log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } + return res, err } func patchMachine(ctx context.Context, patchHelper *patch.Helper, machine *clusterv1.Machine, options ...patch.Option) error { @@ -254,7 +268,7 @@ func patchMachine(ctx context.Context, patchHelper *patch.Helper, machine *clust func (r *Reconciler) reconcile(ctx context.Context, cluster *clusterv1.Cluster, m *clusterv1.Machine) (ctrl.Result, error) { if err := r.watchClusterNodes(ctx, cluster); err != nil { - return ctrl.Result{}, errors.Wrapf(err, "error watching nodes on target cluster") + return ctrl.Result{}, err } // If the machine is a stand-alone one, meaning not originated from a MachineDeployment, then set it as directly diff --git a/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go b/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go index ef9db6b195e2..3833d18fe55a 100644 --- a/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go +++ b/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go @@ -174,6 +174,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re result, err := r.reconcile(ctx, log, cluster, m) if err != nil { + // Requeue if the reconcile failed because the ClusterCacheTracker was locked for + // the current cluster because of concurrent access. + if errors.Is(err, remote.ErrClusterLocked) { + log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } log.Error(err, "Failed to reconcile MachineHealthCheck") r.recorder.Eventf(m, corev1.EventTypeWarning, "ReconcileError", "%v", err) @@ -201,7 +207,6 @@ func (r *Reconciler) reconcile(ctx context.Context, logger logr.Logger, cluster } if err := r.watchClusterNodes(ctx, cluster); err != nil { - logger.Error(err, "error watching nodes on target cluster") return ctrl.Result{}, err } diff --git a/internal/controllers/machineset/machineset_controller.go b/internal/controllers/machineset/machineset_controller.go index c67338e59f16..ab4940265378 100644 --- a/internal/controllers/machineset/machineset_controller.go +++ b/internal/controllers/machineset/machineset_controller.go @@ -172,6 +172,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re result, err := r.reconcile(ctx, cluster, machineSet) if err != nil { + // Requeue if the reconcile failed because the ClusterCacheTracker was locked for + // the current cluster because of concurrent access. + if errors.Is(err, remote.ErrClusterLocked) { + log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } log.Error(err, "Failed to reconcile MachineSet") r.recorder.Eventf(machineSet, corev1.EventTypeWarning, "ReconcileError", "%v", err) } diff --git a/test/infrastructure/docker/exp/internal/controllers/dockermachinepool_controller.go b/test/infrastructure/docker/exp/internal/controllers/dockermachinepool_controller.go index a5eb6f6737cb..749cb1333829 100644 --- a/test/infrastructure/docker/exp/internal/controllers/dockermachinepool_controller.go +++ b/test/infrastructure/docker/exp/internal/controllers/dockermachinepool_controller.go @@ -128,7 +128,14 @@ func (r *DockerMachinePoolReconciler) Reconcile(ctx context.Context, req ctrl.Re } // Handle non-deleted machines - return r.reconcileNormal(ctx, cluster, machinePool, dockerMachinePool) + res, err = r.reconcileNormal(ctx, cluster, machinePool, dockerMachinePool) + // Requeue if the reconcile failed because the ClusterCacheTracker was locked for + // the current cluster because of concurrent access. + if errors.Is(err, remote.ErrClusterLocked) { + log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } + return res, err } // SetupWithManager will add watches for this controller. diff --git a/test/infrastructure/docker/internal/controllers/dockermachine_controller.go b/test/infrastructure/docker/internal/controllers/dockermachine_controller.go index c27e2ed3d98d..e36e9f2e59c0 100644 --- a/test/infrastructure/docker/internal/controllers/dockermachine_controller.go +++ b/test/infrastructure/docker/internal/controllers/dockermachine_controller.go @@ -167,7 +167,14 @@ func (r *DockerMachineReconciler) Reconcile(ctx context.Context, req ctrl.Reques } // Handle non-deleted machines - return r.reconcileNormal(ctx, cluster, machine, dockerMachine, externalMachine, externalLoadBalancer) + res, err := r.reconcileNormal(ctx, cluster, machine, dockerMachine, externalMachine, externalLoadBalancer) + // Requeue if the reconcile failed because the ClusterCacheTracker was locked for + // the current cluster because of concurrent access. + if errors.Is(err, remote.ErrClusterLocked) { + log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } + return res, err } func patchDockerMachine(ctx context.Context, patchHelper *patch.Helper, dockerMachine *infrav1.DockerMachine) error {