From 051b00606dfacd61fbff687155e485040c1b0461 Mon Sep 17 00:00:00 2001 From: Stefan Bueringer Date: Thu, 25 May 2023 08:17:28 +0200 Subject: [PATCH] Use ClusterCacheTracker consistently (intead of NewClusterClient) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stefan Büringer buringerst@vmware.com --- bootstrap/kubeadm/controllers/alias.go | 4 + .../controllers/kubeadmconfig_controller.go | 37 +++++---- .../kubeadmconfig_controller_test.go | 75 ++++++++++--------- bootstrap/kubeadm/main.go | 36 ++++++++- controllers/remote/cluster_cache_tracker.go | 17 ++++- controlplane/kubeadm/main.go | 12 +-- .../controllers/machinepool_controller.go | 20 ++++- .../machinepool_controller_noderef.go | 3 +- .../machinepool_controller_phases_test.go | 31 ++++++-- .../machineset/machineset_controller.go | 1 - main.go | 14 ++-- test/infrastructure/docker/main.go | 12 +-- 12 files changed, 179 insertions(+), 83 deletions(-) diff --git a/bootstrap/kubeadm/controllers/alias.go b/bootstrap/kubeadm/controllers/alias.go index 7f8b99b136ca..199c1a655957 100644 --- a/bootstrap/kubeadm/controllers/alias.go +++ b/bootstrap/kubeadm/controllers/alias.go @@ -25,6 +25,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" kubeadmbootstrapcontrollers "sigs.k8s.io/cluster-api/bootstrap/kubeadm/internal/controllers" + "sigs.k8s.io/cluster-api/controllers/remote" ) // Following types provides access to reconcilers implemented in internal/controllers, thus @@ -39,6 +40,8 @@ const ( type KubeadmConfigReconciler struct { Client client.Client + Tracker *remote.ClusterCacheTracker + // WatchFilterValue is the label value used to filter events prior to reconciliation. WatchFilterValue string @@ -50,6 +53,7 @@ type KubeadmConfigReconciler struct { func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { return (&kubeadmbootstrapcontrollers.KubeadmConfigReconciler{ Client: r.Client, + Tracker: r.Tracker, WatchFilterValue: r.WatchFilterValue, TokenTTL: r.TokenTTL, }).SetupWithManager(ctx, mgr, options) diff --git a/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go b/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go index 45fe463f5d5b..be97c4d71910 100644 --- a/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go +++ b/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go @@ -59,11 +59,6 @@ import ( "sigs.k8s.io/cluster-api/util/secret" ) -const ( - // KubeadmConfigControllerName defines the controller used when creating clients. - KubeadmConfigControllerName = "kubeadmconfig-controller" -) - const ( // DefaultTokenTTL is the default TTL used for tokens. DefaultTokenTTL = 15 * time.Minute @@ -82,6 +77,7 @@ type InitLocker interface { // KubeadmConfigReconciler reconciles a KubeadmConfig object. type KubeadmConfigReconciler struct { Client client.Client + Tracker *remote.ClusterCacheTracker KubeadmInitLock InitLocker // WatchFilterValue is the label value used to filter events prior to reconciliation. @@ -89,8 +85,6 @@ type KubeadmConfigReconciler struct { // TokenTTL is the amount of time a bootstrap token (and therefore a KubeadmConfig) will be valid. TokenTTL time.Duration - - remoteClientGetter remote.ClusterClientGetter } // Scope is a scoped struct used during reconciliation. @@ -106,9 +100,6 @@ func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl if r.KubeadmInitLock == nil { r.KubeadmInitLock = locking.NewControlPlaneInitMutex(mgr.GetClient()) } - if r.remoteClientGetter == nil { - r.remoteClientGetter = remote.NewClusterClient - } if r.TokenTTL == 0 { r.TokenTTL = DefaultTokenTTL } @@ -239,6 +230,25 @@ func (r *KubeadmConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques } } }() + + // Ignore deleted KubeadmConfigs. + if !config.DeletionTimestamp.IsZero() { + return ctrl.Result{}, nil + } + + res, err := r.reconcile(ctx, scope, cluster, config, configOwner) + if err != nil && errors.Is(err, remote.ErrClusterLocked) { + // Requeue if the reconcile failed because the ClusterCacheTracker was locked for + // the current cluster because of concurrent access. + log.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } + return res, err +} + +func (r *KubeadmConfigReconciler) reconcile(ctx context.Context, scope *Scope, cluster *clusterv1.Cluster, config *bootstrapv1.KubeadmConfig, configOwner *bsutil.ConfigOwner) (ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx) + // Ensure the bootstrap secret associated with this KubeadmConfig has the correct ownerReference. if err := r.ensureBootstrapSecretOwnersRef(ctx, scope); err != nil { return ctrl.Result{}, err @@ -305,9 +315,8 @@ func (r *KubeadmConfigReconciler) refreshBootstrapToken(ctx context.Context, con log := ctrl.LoggerFrom(ctx) token := config.Spec.JoinConfiguration.Discovery.BootstrapToken.Token - remoteClient, err := r.remoteClientGetter(ctx, KubeadmConfigControllerName, r.Client, util.ObjectKey(cluster)) + remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster)) if err != nil { - log.Error(err, "Error creating remote cluster client") return ctrl.Result{}, err } @@ -323,7 +332,7 @@ func (r *KubeadmConfigReconciler) refreshBootstrapToken(ctx context.Context, con func (r *KubeadmConfigReconciler) rotateMachinePoolBootstrapToken(ctx context.Context, config *bootstrapv1.KubeadmConfig, cluster *clusterv1.Cluster, scope *Scope) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) log.V(2).Info("Config is owned by a MachinePool, checking if token should be rotated") - remoteClient, err := r.remoteClientGetter(ctx, KubeadmConfigControllerName, r.Client, util.ObjectKey(cluster)) + remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster)) if err != nil { return ctrl.Result{}, err } @@ -928,7 +937,7 @@ func (r *KubeadmConfigReconciler) reconcileDiscovery(ctx context.Context, cluste // if BootstrapToken already contains a token, respect it; otherwise create a new bootstrap token for the node to join if config.Spec.JoinConfiguration.Discovery.BootstrapToken.Token == "" { - remoteClient, err := r.remoteClientGetter(ctx, KubeadmConfigControllerName, r.Client, util.ObjectKey(cluster)) + remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster)) if err != nil { return ctrl.Result{}, err } diff --git a/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller_test.go b/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller_test.go index 2aabce32c477..77deb624f067 100644 --- a/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller_test.go +++ b/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller_test.go @@ -25,6 +25,7 @@ import ( "time" ignition "github.com/flatcar/ignition/config/v2_3" + "github.com/go-logr/logr" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -34,13 +35,14 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/yaml" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1" bootstrapbuilder "sigs.k8s.io/cluster-api/bootstrap/kubeadm/internal/builder" - fakeremote "sigs.k8s.io/cluster-api/controllers/remote/fake" + "sigs.k8s.io/cluster-api/controllers/remote" expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1" "sigs.k8s.io/cluster-api/feature" "sigs.k8s.io/cluster-api/internal/test/builder" @@ -495,9 +497,9 @@ func TestKubeadmConfigReconciler_Reconcile_GenerateCloudConfigData(t *testing.T) myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build() k := &KubeadmConfigReconciler{ - Client: myclient, - KubeadmInitLock: &myInitLocker{}, - remoteClientGetter: fakeremote.NewClusterClient, + Client: myclient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}), + KubeadmInitLock: &myInitLocker{}, } request := ctrl.Request{ @@ -556,9 +558,9 @@ func TestKubeadmConfigReconciler_Reconcile_ErrorIfJoiningControlPlaneHasInvalidC myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build() k := &KubeadmConfigReconciler{ - Client: myclient, - KubeadmInitLock: &myInitLocker{}, - remoteClientGetter: fakeremote.NewClusterClient, + Client: myclient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}), + KubeadmInitLock: &myInitLocker{}, } request := ctrl.Request{ @@ -677,9 +679,9 @@ func TestReconcileIfJoinCertificatesAvailableConditioninNodesAndControlPlaneIsRe objects = append(objects, createSecrets(t, cluster, config)...) myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build() k := &KubeadmConfigReconciler{ - Client: myclient, - KubeadmInitLock: &myInitLocker{}, - remoteClientGetter: fakeremote.NewClusterClient, + Client: myclient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}), + KubeadmInitLock: &myInitLocker{}, } request := ctrl.Request{ @@ -754,9 +756,9 @@ func TestReconcileIfJoinNodePoolsAndControlPlaneIsReady(t *testing.T) { objects = append(objects, createSecrets(t, cluster, config)...) myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build() k := &KubeadmConfigReconciler{ - Client: myclient, - KubeadmInitLock: &myInitLocker{}, - remoteClientGetter: fakeremote.NewClusterClient, + Client: myclient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}), + KubeadmInitLock: &myInitLocker{}, } request := ctrl.Request{ @@ -854,9 +856,9 @@ func TestBootstrapDataFormat(t *testing.T) { myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build() k := &KubeadmConfigReconciler{ - Client: myclient, - KubeadmInitLock: &myInitLocker{}, - remoteClientGetter: fakeremote.NewClusterClient, + Client: myclient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}), + KubeadmInitLock: &myInitLocker{}, } request := ctrl.Request{ NamespacedName: client.ObjectKey{ @@ -934,9 +936,9 @@ func TestKubeadmConfigSecretCreatedStatusNotPatched(t *testing.T) { objects = append(objects, createSecrets(t, cluster, initConfig)...) myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build() k := &KubeadmConfigReconciler{ - Client: myclient, - KubeadmInitLock: &myInitLocker{}, - remoteClientGetter: fakeremote.NewClusterClient, + Client: myclient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}), + KubeadmInitLock: &myInitLocker{}, } request := ctrl.Request{ NamespacedName: client.ObjectKey{ @@ -1011,10 +1013,10 @@ func TestBootstrapTokenTTLExtension(t *testing.T) { objects = append(objects, createSecrets(t, cluster, initConfig)...) myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}, &clusterv1.Machine{}).Build() k := &KubeadmConfigReconciler{ - Client: myclient, - KubeadmInitLock: &myInitLocker{}, - TokenTTL: DefaultTokenTTL, - remoteClientGetter: fakeremote.NewClusterClient, + Client: myclient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}), + KubeadmInitLock: &myInitLocker{}, + TokenTTL: DefaultTokenTTL, } request := ctrl.Request{ NamespacedName: client.ObjectKey{ @@ -1212,10 +1214,10 @@ func TestBootstrapTokenRotationMachinePool(t *testing.T) { objects = append(objects, createSecrets(t, cluster, initConfig)...) myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}, &expv1.MachinePool{}).Build() k := &KubeadmConfigReconciler{ - Client: myclient, - KubeadmInitLock: &myInitLocker{}, - TokenTTL: DefaultTokenTTL, - remoteClientGetter: fakeremote.NewClusterClient, + Client: myclient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}), + KubeadmInitLock: &myInitLocker{}, + TokenTTL: DefaultTokenTTL, } request := ctrl.Request{ NamespacedName: client.ObjectKey{ @@ -1368,12 +1370,6 @@ func TestBootstrapTokenRotationMachinePool(t *testing.T) { // Ensure the discovery portion of the JoinConfiguration gets generated correctly. func TestKubeadmConfigReconciler_Reconcile_DiscoveryReconcileBehaviors(t *testing.T) { - k := &KubeadmConfigReconciler{ - Client: fake.NewClientBuilder().Build(), - KubeadmInitLock: &myInitLocker{}, - remoteClientGetter: fakeremote.NewClusterClient, - } - caHash := []string{"...."} bootstrapToken := bootstrapv1.Discovery{ BootstrapToken: &bootstrapv1.BootstrapTokenDiscovery{ @@ -1499,6 +1495,13 @@ func TestKubeadmConfigReconciler_Reconcile_DiscoveryReconcileBehaviors(t *testin t.Run(tc.name, func(t *testing.T) { g := NewWithT(t) + fakeClient := fake.NewClientBuilder().Build() + k := &KubeadmConfigReconciler{ + Client: fakeClient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), fakeClient, fakeClient.Scheme(), client.ObjectKey{Name: tc.cluster.Name, Namespace: tc.cluster.Namespace}), + KubeadmInitLock: &myInitLocker{}, + } + res, err := k.reconcileDiscovery(ctx, tc.cluster, tc.config, secret.Certificates{}) g.Expect(res.IsZero()).To(BeTrue()) g.Expect(err).NotTo(HaveOccurred()) @@ -1710,9 +1713,9 @@ func TestKubeadmConfigReconciler_Reconcile_AlwaysCheckCAVerificationUnlessReques myclient := fake.NewClientBuilder().WithObjects(objects...).Build() reconciler := KubeadmConfigReconciler{ - Client: myclient, - KubeadmInitLock: &myInitLocker{}, - remoteClientGetter: fakeremote.NewClusterClient, + Client: myclient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}), + KubeadmInitLock: &myInitLocker{}, } wc := newWorkerJoinKubeadmConfig(metav1.NamespaceDefault, "worker-join-cfg") diff --git a/bootstrap/kubeadm/main.go b/bootstrap/kubeadm/main.go index 699f166b53e6..d7703cc58ccd 100644 --- a/bootstrap/kubeadm/main.go +++ b/bootstrap/kubeadm/main.go @@ -55,8 +55,9 @@ import ( ) var ( - scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") + controllerName = "cluster-api-kubeadm-bootstrap-manager" ) func init() { @@ -80,6 +81,7 @@ var ( watchFilterValue string watchNamespace string profilerAddress string + clusterConcurrency int kubeadmConfigConcurrency int syncPeriod time.Duration restConfigQPS float32 @@ -117,6 +119,9 @@ func InitFlags(fs *pflag.FlagSet) { fs.StringVar(&profilerAddress, "profiler-address", "", "Bind address to expose the pprof profiler (e.g. localhost:6060)") + fs.IntVar(&clusterConcurrency, "cluster-concurrency", 10, + "Number of clusters to process simultaneously") + fs.IntVar(&kubeadmConfigConcurrency, "kubeadmconfig-concurrency", 10, "Number of kubeadm configs to process simultaneously") @@ -166,7 +171,7 @@ func main() { restConfig := ctrl.GetConfigOrDie() restConfig.QPS = restConfigQPS restConfig.Burst = restConfigBurst - restConfig.UserAgent = remote.DefaultClusterAPIUserAgent("cluster-api-kubeadm-bootstrap-manager") + restConfig.UserAgent = remote.DefaultClusterAPIUserAgent(controllerName) tlsOptionOverrides, err := flags.GetTLSOptionOverrideFuncs(tlsOptions) if err != nil { @@ -245,8 +250,33 @@ func setupChecks(mgr ctrl.Manager) { } func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { + // Set up a ClusterCacheTracker and ClusterCacheReconciler to provide to controllers + // requiring a connection to a remote cluster + log := ctrl.Log.WithName("remote").WithName("ClusterCacheTracker") + tracker, err := remote.NewClusterCacheTracker( + mgr, + remote.ClusterCacheTrackerOptions{ + ControllerName: controllerName, + Log: &log, + Indexes: remote.DefaultIndexes, + }, + ) + if err != nil { + setupLog.Error(err, "unable to create cluster cache tracker") + os.Exit(1) + } + if err := (&remote.ClusterCacheReconciler{ + Client: mgr.GetClient(), + Tracker: tracker, + WatchFilterValue: watchFilterValue, + }).SetupWithManager(ctx, mgr, concurrency(clusterConcurrency)); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler") + os.Exit(1) + } + if err := (&kubeadmbootstrapcontrollers.KubeadmConfigReconciler{ Client: mgr.GetClient(), + Tracker: tracker, WatchFilterValue: watchFilterValue, TokenTTL: tokenTTL, }).SetupWithManager(ctx, mgr, concurrency(kubeadmConfigConcurrency)); err != nil { diff --git a/controllers/remote/cluster_cache_tracker.go b/controllers/remote/cluster_cache_tracker.go index f506eed2603e..685a89eb9123 100644 --- a/controllers/remote/cluster_cache_tracker.go +++ b/controllers/remote/cluster_cache_tracker.go @@ -81,6 +81,10 @@ type ClusterCacheTracker struct { indexes []Index + // controllerName is the name of the controller. + // This is used to calculate the user agent string. + controllerName string + // controllerPodMetadata is the Pod metadata of the controller using this ClusterCacheTracker. // This is only set when the POD_NAMESPACE, POD_NAME and POD_UID environment variables are set. // This information will be used to detected if the controller is running on a workload cluster, so @@ -100,6 +104,11 @@ type ClusterCacheTrackerOptions struct { // Defaults to never caching ConfigMap and Secret if not set. ClientUncachedObjects []client.Object Indexes []Index + + // ControllerName is the name of the controller. + // This is used to calculate the user agent string. + // If not set, it defaults to "cluster-cache-tracker". + ControllerName string } func setDefaultOptions(opts *ClusterCacheTrackerOptions) { @@ -120,6 +129,11 @@ func setDefaultOptions(opts *ClusterCacheTrackerOptions) { func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOptions) (*ClusterCacheTracker, error) { setDefaultOptions(&options) + controllerName := options.ControllerName + if controllerName == "" { + controllerName = clusterCacheControllerName + } + var controllerPodMetadata *metav1.ObjectMeta podNamespace := os.Getenv("POD_NAMESPACE") podName := os.Getenv("POD_NAME") @@ -136,6 +150,7 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt } return &ClusterCacheTracker{ + controllerName: controllerName, controllerPodMetadata: controllerPodMetadata, log: *options.Log, clientUncachedObjects: options.ClientUncachedObjects, @@ -257,7 +272,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl log := ctrl.LoggerFrom(ctx) // Get a rest config for the remote cluster - config, err := RESTConfig(ctx, clusterCacheControllerName, t.client, cluster) + config, err := RESTConfig(ctx, t.controllerName, t.client, cluster) if err != nil { return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String()) } diff --git a/controlplane/kubeadm/main.go b/controlplane/kubeadm/main.go index a0807f625724..af3b01944c1c 100644 --- a/controlplane/kubeadm/main.go +++ b/controlplane/kubeadm/main.go @@ -59,8 +59,9 @@ import ( ) var ( - scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") + controllerName = "cluster-api-kubeadm-control-plane-manager" ) func init() { @@ -174,7 +175,7 @@ func main() { restConfig := ctrl.GetConfigOrDie() restConfig.QPS = restConfigQPS restConfig.Burst = restConfigBurst - restConfig.UserAgent = remote.DefaultClusterAPIUserAgent("cluster-api-kubeadm-control-plane-manager") + restConfig.UserAgent = remote.DefaultClusterAPIUserAgent(controllerName) tlsOptionOverrides, err := flags.GetTLSOptionOverrideFuncs(tlsOptions) if err != nil { @@ -257,8 +258,9 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { // requiring a connection to a remote cluster log := ctrl.Log.WithName("remote").WithName("ClusterCacheTracker") tracker, err := remote.NewClusterCacheTracker(mgr, remote.ClusterCacheTrackerOptions{ - Log: &log, - Indexes: remote.DefaultIndexes, + ControllerName: controllerName, + Log: &log, + Indexes: remote.DefaultIndexes, ClientUncachedObjects: []client.Object{ &corev1.ConfigMap{}, &corev1.Secret{}, diff --git a/exp/internal/controllers/machinepool_controller.go b/exp/internal/controllers/machinepool_controller.go index d431206cc7a5..3b31542da6eb 100644 --- a/exp/internal/controllers/machinepool_controller.go +++ b/exp/internal/controllers/machinepool_controller.go @@ -189,11 +189,25 @@ func (r *MachinePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Handle deletion reconciliation loop. if !mp.ObjectMeta.DeletionTimestamp.IsZero() { - return r.reconcileDelete(ctx, cluster, mp) + res, err := r.reconcileDelete(ctx, cluster, mp) + // Requeue if the reconcile failed because the ClusterCacheTracker was locked for + // the current cluster because of concurrent access. + if errors.Is(err, remote.ErrClusterLocked) { + log.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } + return res, err } // Handle normal reconciliation loop. - return r.reconcile(ctx, cluster, mp) + res, err := r.reconcile(ctx, cluster, mp) + // Requeue if the reconcile failed because the ClusterCacheTracker was locked for + // the current cluster because of concurrent access. + if errors.Is(err, remote.ErrClusterLocked) { + log.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker") + return ctrl.Result{Requeue: true}, nil + } + return res, err } func (r *MachinePoolReconciler) reconcile(ctx context.Context, cluster *clusterv1.Cluster, mp *expv1.MachinePool) (ctrl.Result, error) { @@ -249,7 +263,7 @@ func (r *MachinePoolReconciler) reconcileDeleteNodes(ctx context.Context, cluste return nil } - clusterClient, err := remote.NewClusterClient(ctx, MachinePoolControllerName, r.Client, util.ObjectKey(cluster)) + clusterClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster)) if err != nil { return err } diff --git a/exp/internal/controllers/machinepool_controller_noderef.go b/exp/internal/controllers/machinepool_controller_noderef.go index 14c40ce64add..f46d1b77e718 100644 --- a/exp/internal/controllers/machinepool_controller_noderef.go +++ b/exp/internal/controllers/machinepool_controller_noderef.go @@ -27,7 +27,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" - "sigs.k8s.io/cluster-api/controllers/remote" expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1" "sigs.k8s.io/cluster-api/internal/util/taints" "sigs.k8s.io/cluster-api/util" @@ -72,7 +71,7 @@ func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, cluster * return ctrl.Result{}, nil } - clusterClient, err := remote.NewClusterClient(ctx, MachinePoolControllerName, r.Client, util.ObjectKey(cluster)) + clusterClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster)) if err != nil { return ctrl.Result{}, err } diff --git a/exp/internal/controllers/machinepool_controller_phases_test.go b/exp/internal/controllers/machinepool_controller_phases_test.go index a7236d99d5a8..9ab1290ce417 100644 --- a/exp/internal/controllers/machinepool_controller_phases_test.go +++ b/exp/internal/controllers/machinepool_controller_phases_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/go-logr/logr" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -31,8 +32,10 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/log" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/controllers/remote" expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1" "sigs.k8s.io/cluster-api/internal/test/builder" "sigs.k8s.io/cluster-api/util/kubeconfig" @@ -222,8 +225,10 @@ func TestReconcileMachinePoolPhases(t *testing.T) { // Set NodeRef. machinepool.Status.NodeRefs = []corev1.ObjectReference{{Kind: "Node", Name: "machinepool-test-node"}} + fakeClient := fake.NewClientBuilder().WithObjects(defaultCluster, defaultKubeconfigSecret, machinepool, bootstrapConfig, infraConfig, builder.TestBootstrapConfigCRD, builder.TestInfrastructureMachineTemplateCRD).Build() r := &MachinePoolReconciler{ - Client: fake.NewClientBuilder().WithObjects(defaultCluster, defaultKubeconfigSecret, machinepool, bootstrapConfig, infraConfig, builder.TestBootstrapConfigCRD, builder.TestInfrastructureMachineTemplateCRD).Build(), + Client: fakeClient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), fakeClient, fakeClient.Scheme(), client.ObjectKey{Name: defaultCluster.Name, Namespace: defaultCluster.Namespace}), } res, err := r.reconcile(ctx, defaultCluster, machinepool) @@ -277,8 +282,10 @@ func TestReconcileMachinePoolPhases(t *testing.T) { // Set NodeRef. machinepool.Status.NodeRefs = []corev1.ObjectReference{{Kind: "Node", Name: "machinepool-test-node"}} + fakeClient := fake.NewClientBuilder().WithObjects(defaultCluster, defaultKubeconfigSecret, machinepool, bootstrapConfig, infraConfig, builder.TestBootstrapConfigCRD, builder.TestInfrastructureMachineTemplateCRD).Build() r := &MachinePoolReconciler{ - Client: fake.NewClientBuilder().WithObjects(defaultCluster, defaultKubeconfigSecret, machinepool, bootstrapConfig, infraConfig, builder.TestBootstrapConfigCRD, builder.TestInfrastructureMachineTemplateCRD).Build(), + Client: fakeClient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), fakeClient, fakeClient.Scheme(), client.ObjectKey{Name: defaultCluster.Name, Namespace: defaultCluster.Namespace}), } res, err := r.reconcile(ctx, defaultCluster, machinepool) @@ -350,8 +357,10 @@ func TestReconcileMachinePoolPhases(t *testing.T) { // Set NodeRef. machinepool.Status.NodeRefs = []corev1.ObjectReference{{Kind: "Node", Name: "machinepool-test-node"}} + fakeClient := fake.NewClientBuilder().WithObjects(defaultCluster, defaultKubeconfigSecret, machinepool, bootstrapConfig, infraConfig, builder.TestBootstrapConfigCRD, builder.TestInfrastructureMachineTemplateCRD).Build() r := &MachinePoolReconciler{ - Client: fake.NewClientBuilder().WithObjects(defaultCluster, defaultKubeconfigSecret, machinepool, bootstrapConfig, infraConfig, builder.TestBootstrapConfigCRD, builder.TestInfrastructureMachineTemplateCRD).Build(), + Client: fakeClient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), fakeClient, fakeClient.Scheme(), client.ObjectKey{Name: defaultCluster.Name, Namespace: defaultCluster.Namespace}), } res, err := r.reconcile(ctx, defaultCluster, machinepool) @@ -403,8 +412,10 @@ func TestReconcileMachinePoolPhases(t *testing.T) { {Kind: "Node", Name: "machinepool-test-node-3"}, } + fakeClient := fake.NewClientBuilder().WithObjects(defaultCluster, defaultKubeconfigSecret, machinepool, bootstrapConfig, infraConfig, builder.TestBootstrapConfigCRD, builder.TestInfrastructureMachineTemplateCRD).Build() r := &MachinePoolReconciler{ - Client: fake.NewClientBuilder().WithObjects(defaultCluster, defaultKubeconfigSecret, machinepool, bootstrapConfig, infraConfig, builder.TestBootstrapConfigCRD, builder.TestInfrastructureMachineTemplateCRD).Build(), + Client: fakeClient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), fakeClient, fakeClient.Scheme(), client.ObjectKey{Name: defaultCluster.Name, Namespace: defaultCluster.Namespace}), } res, err := r.reconcile(ctx, defaultCluster, machinepool) @@ -1173,8 +1184,10 @@ func TestReconcileMachinePoolScaleToFromZero(t *testing.T) { err = unstructured.SetNestedField(infraConfig.Object, int64(1), "status", "replicas") g.Expect(err).NotTo(HaveOccurred()) + fakeClient := fake.NewClientBuilder().WithObjects(testCluster, kubeconfigSecret, machinepool, bootstrapConfig, infraConfig, builder.TestBootstrapConfigCRD, builder.TestInfrastructureMachineTemplateCRD).Build() r := &MachinePoolReconciler{ - Client: fake.NewClientBuilder().WithObjects(testCluster, kubeconfigSecret, machinepool, bootstrapConfig, infraConfig, builder.TestBootstrapConfigCRD, builder.TestInfrastructureMachineTemplateCRD).Build(), + Client: fakeClient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), env.GetClient(), env.GetClient().Scheme(), client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}), recorder: record.NewFakeRecorder(32), } @@ -1228,8 +1241,10 @@ func TestReconcileMachinePoolScaleToFromZero(t *testing.T) { err = unstructured.SetNestedField(infraConfig.Object, int64(0), "status", "replicas") g.Expect(err).NotTo(HaveOccurred()) + fakeClient := fake.NewClientBuilder().WithObjects(testCluster, kubeconfigSecret, machinepool, bootstrapConfig, infraConfig, builder.TestBootstrapConfigCRD, builder.TestInfrastructureMachineTemplateCRD).Build() r := &MachinePoolReconciler{ - Client: fake.NewClientBuilder().WithObjects(testCluster, kubeconfigSecret, machinepool, bootstrapConfig, infraConfig, builder.TestBootstrapConfigCRD, builder.TestInfrastructureMachineTemplateCRD).Build(), + Client: fakeClient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), env.GetClient(), env.GetClient().Scheme(), client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}), recorder: record.NewFakeRecorder(32), } @@ -1357,8 +1372,10 @@ func TestReconcileMachinePoolScaleToFromZero(t *testing.T) { err = unstructured.SetNestedField(infraConfig.Object, int64(1), "status", "replicas") g.Expect(err).NotTo(HaveOccurred()) + fakeClient := fake.NewClientBuilder().WithObjects(testCluster, kubeconfigSecret, machinepool, bootstrapConfig, infraConfig, builder.TestBootstrapConfigCRD, builder.TestInfrastructureMachineTemplateCRD).Build() r := &MachinePoolReconciler{ - Client: fake.NewClientBuilder().WithObjects(testCluster, kubeconfigSecret, machinepool, bootstrapConfig, infraConfig, builder.TestBootstrapConfigCRD, builder.TestInfrastructureMachineTemplateCRD).Build(), + Client: fakeClient, + Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), env.GetClient(), env.GetClient().Scheme(), client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}), recorder: record.NewFakeRecorder(32), } diff --git a/internal/controllers/machineset/machineset_controller.go b/internal/controllers/machineset/machineset_controller.go index 4a3e76ae7693..efa6558ac020 100644 --- a/internal/controllers/machineset/machineset_controller.go +++ b/internal/controllers/machineset/machineset_controller.go @@ -185,7 +185,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re log.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker") return ctrl.Result{Requeue: true}, nil } - log.Error(err, "Failed to reconcile MachineSet") r.recorder.Eventf(machineSet, corev1.EventTypeWarning, "ReconcileError", "%v", err) } return result, err diff --git a/main.go b/main.go index 7f1e4f363329..bb1d6aa595c4 100644 --- a/main.go +++ b/main.go @@ -72,9 +72,10 @@ import ( ) var ( - catalog = runtimecatalog.New() - scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") + catalog = runtimecatalog.New() + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") + controllerName = "cluster-api-controller-manager" // flags. metricsBindAddr string @@ -228,7 +229,7 @@ func main() { restConfig := ctrl.GetConfigOrDie() restConfig.QPS = restConfigQPS restConfig.Burst = restConfigBurst - restConfig.UserAgent = remote.DefaultClusterAPIUserAgent("cluster-api-controller-manager") + restConfig.UserAgent = remote.DefaultClusterAPIUserAgent(controllerName) minVer := version.MinimumKubernetesVersion if feature.Gates.Enabled(feature.ClusterTopology) { @@ -331,8 +332,9 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { tracker, err := remote.NewClusterCacheTracker( mgr, remote.ClusterCacheTrackerOptions{ - Log: &log, - Indexes: remote.DefaultIndexes, + ControllerName: controllerName, + Log: &log, + Indexes: remote.DefaultIndexes, }, ) if err != nil { diff --git a/test/infrastructure/docker/main.go b/test/infrastructure/docker/main.go index a3c6891072dc..8ee234cc8854 100644 --- a/test/infrastructure/docker/main.go +++ b/test/infrastructure/docker/main.go @@ -60,8 +60,9 @@ import ( ) var ( - scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") + controllerName = "cluster-api-docker-controller-manager" // flags. metricsBindAddr string @@ -172,7 +173,7 @@ func main() { restConfig := ctrl.GetConfigOrDie() restConfig.QPS = restConfigQPS restConfig.Burst = restConfigBurst - restConfig.UserAgent = remote.DefaultClusterAPIUserAgent("cluster-api-docker-controller-manager") + restConfig.UserAgent = remote.DefaultClusterAPIUserAgent(controllerName) tlsOptionOverrides, err := flags.GetTLSOptionOverrideFuncs(tlsOptions) if err != nil { @@ -262,8 +263,9 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { tracker, err := remote.NewClusterCacheTracker( mgr, remote.ClusterCacheTrackerOptions{ - Log: &log, - Indexes: remote.DefaultIndexes, + ControllerName: controllerName, + Log: &log, + Indexes: remote.DefaultIndexes, }, ) if err != nil {