Skip to content

Commit

Permalink
adjust call sites
Browse files Browse the repository at this point in the history
  • Loading branch information
sbueringer committed Nov 14, 2022
1 parent 45f007c commit 1318edc
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 16 deletions.
17 changes: 9 additions & 8 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,29 +437,30 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
return errors.New("input.Name is required")
}

a, err := t.getClusterAccessor(ctx, input.Cluster, t.indexes...)
accessor, err := t.getClusterAccessor(ctx, input.Cluster, t.indexes...)
if err != nil {
return err
return errors.Wrapf(err, "failed to add %s watch on cluster %s", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
}

// We have to lock the cluster, so that the watch is not created multiple times in parallel.
ok := t.clusterLock.TryLock(input.Cluster)
if !ok {
return errors.Wrapf(ErrClusterLocked, "failed to add watch: error getting lock for cluster")
return errors.Wrapf(ErrClusterLocked, "failed to add %s watch on cluster %s: failed to get lock for cluster", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
}
defer t.clusterLock.Unlock(input.Cluster)

if a.watches.Has(input.Name) {
t.log.V(6).Info("Watch already exists", "Cluster", klog.KRef(input.Cluster.Namespace, input.Cluster.Name), "name", input.Name)
if accessor.watches.Has(input.Name) {
log := ctrl.LoggerFrom(ctx)
log.V(6).Info("Watch already exists", "Cluster", klog.KRef(input.Cluster.Namespace, input.Cluster.Name), "name", input.Name)
return nil
}

// Need to create the watch
if err := input.Watcher.Watch(source.NewKindWithCache(input.Kind, a.cache), input.EventHandler, input.Predicates...); err != nil {
return errors.Wrap(err, "error creating watch")
if err := input.Watcher.Watch(source.NewKindWithCache(input.Kind, accessor.cache), input.EventHandler, input.Predicates...); err != nil {
return errors.Wrapf(err, "failed to add %s watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
}

a.watches.Insert(input.Name)
accessor.watches.Insert(input.Name)

return nil
}
Expand Down
16 changes: 14 additions & 2 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,23 @@ func (r *KubeadmControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.

if !kcp.ObjectMeta.DeletionTimestamp.IsZero() {
// Handle deletion reconciliation loop.
return r.reconcileDelete(ctx, cluster, kcp)
res, err = r.reconcileDelete(ctx, cluster, kcp)
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
// the current cluster because of concurrent access.
if errors.Is(err, remote.ErrClusterLocked) {
return ctrl.Result{Requeue: true}, nil
}
return res, err
}

// Handle normal reconciliation loop.
return r.reconcile(ctx, cluster, kcp)
res, err = r.reconcile(ctx, cluster, kcp)
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
// the current cluster because of concurrent access.
if errors.Is(err, remote.ErrClusterLocked) {
return ctrl.Result{Requeue: true}, nil
}
return res, err
}

func patchKubeadmControlPlane(ctx context.Context, patchHelper *patch.Helper, kcp *controlplanev1.KubeadmControlPlane) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func (r *ClusterResourceSetReconciler) Reconcile(ctx context.Context, req ctrl.R

for _, cluster := range clusters {
if err := r.ApplyClusterResourceSet(ctx, cluster, clusterResourceSet); err != nil {
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
// the current cluster because of concurrent access.
if errors.Is(err, remote.ErrClusterLocked) {
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}
}
Expand Down
18 changes: 15 additions & 3 deletions internal/controllers/machine/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,23 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re

// Handle deletion reconciliation loop.
if !m.ObjectMeta.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, cluster, m)
res, err := r.reconcileDelete(ctx, cluster, m)
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
// the current cluster because of concurrent access.
if errors.Is(err, remote.ErrClusterLocked) {
return ctrl.Result{Requeue: true}, nil
}
return res, err
}

// Handle normal reconciliation loop.
return r.reconcile(ctx, cluster, m)
res, err := r.reconcile(ctx, cluster, m)
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
// the current cluster because of concurrent access.
if errors.Is(err, remote.ErrClusterLocked) {
return ctrl.Result{Requeue: true}, nil
}
return res, err
}

func patchMachine(ctx context.Context, patchHelper *patch.Helper, machine *clusterv1.Machine, options ...patch.Option) error {
Expand Down Expand Up @@ -254,7 +266,7 @@ func patchMachine(ctx context.Context, patchHelper *patch.Helper, machine *clust

func (r *Reconciler) reconcile(ctx context.Context, cluster *clusterv1.Cluster, m *clusterv1.Machine) (ctrl.Result, error) {
if err := r.watchClusterNodes(ctx, cluster); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "error watching nodes on target cluster")
return ctrl.Result{}, err
}

// If the Machine belongs to a cluster, add an owner reference.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re

result, err := r.reconcile(ctx, log, cluster, m)
if err != nil {
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
// the current cluster because of concurrent access.
if errors.Is(err, remote.ErrClusterLocked) {
return ctrl.Result{Requeue: true}, nil
}
log.Error(err, "Failed to reconcile MachineHealthCheck")
r.recorder.Eventf(m, corev1.EventTypeWarning, "ReconcileError", "%v", err)

Expand Down Expand Up @@ -201,7 +206,6 @@ func (r *Reconciler) reconcile(ctx context.Context, logger logr.Logger, cluster
}

if err := r.watchClusterNodes(ctx, cluster); err != nil {
logger.Error(err, "error watching nodes on target cluster")
return ctrl.Result{}, err
}

Expand Down
5 changes: 5 additions & 0 deletions internal/controllers/machineset/machineset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re

result, err := r.reconcile(ctx, cluster, machineSet)
if err != nil {
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
// the current cluster because of concurrent access.
if errors.Is(err, remote.ErrClusterLocked) {
return ctrl.Result{Requeue: true}, nil
}
log.Error(err, "Failed to reconcile MachineSet")
r.recorder.Eventf(machineSet, corev1.EventTypeWarning, "ReconcileError", "%v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,13 @@ func (r *DockerMachinePoolReconciler) Reconcile(ctx context.Context, req ctrl.Re
}

// Handle non-deleted machines
return r.reconcileNormal(ctx, cluster, machinePool, dockerMachinePool)
res, err = r.reconcileNormal(ctx, cluster, machinePool, dockerMachinePool)
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
// the current cluster because of concurrent access.
if errors.Is(err, remote.ErrClusterLocked) {
return ctrl.Result{Requeue: true}, nil
}
return res, err
}

// SetupWithManager will add watches for this controller.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,13 @@ func (r *DockerMachineReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

// Handle non-deleted machines
return r.reconcileNormal(ctx, cluster, machine, dockerMachine, externalMachine, externalLoadBalancer)
res, err := r.reconcileNormal(ctx, cluster, machine, dockerMachine, externalMachine, externalLoadBalancer)
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
// the current cluster because of concurrent access.
if errors.Is(err, remote.ErrClusterLocked) {
return ctrl.Result{Requeue: true}, nil
}
return res, err
}

func patchDockerMachine(ctx context.Context, patchHelper *patch.Helper, dockerMachine *infrav1.DockerMachine) error {
Expand Down

0 comments on commit 1318edc

Please sign in to comment.