From 6700138aeb107773f09b291ea1fcb1f9f9f188bc Mon Sep 17 00:00:00 2001 From: Christian Schlotter Date: Thu, 3 Aug 2023 16:45:00 +0200 Subject: [PATCH] Add separate concurrency flag for cluster cache tracker --- bootstrap/kubeadm/main.go | 48 +++++++++++++---------- controlplane/kubeadm/main.go | 6 ++- main.go | 62 ++++++++++++++++-------------- test/infrastructure/docker/main.go | 42 +++++++++++--------- 4 files changed, 88 insertions(+), 70 deletions(-) diff --git a/bootstrap/kubeadm/main.go b/bootstrap/kubeadm/main.go index 819a5c31ccec..89623e6166ec 100644 --- a/bootstrap/kubeadm/main.go +++ b/bootstrap/kubeadm/main.go @@ -76,26 +76,27 @@ func init() { } var ( - metricsBindAddr string - enableLeaderElection bool - leaderElectionLeaseDuration time.Duration - leaderElectionRenewDeadline time.Duration - leaderElectionRetryPeriod time.Duration - watchFilterValue string - watchNamespace string - profilerAddress string - enableContentionProfiling bool - clusterConcurrency int - kubeadmConfigConcurrency int - syncPeriod time.Duration - restConfigQPS float32 - restConfigBurst int - webhookPort int - webhookCertDir string - healthAddr string - tokenTTL time.Duration - tlsOptions = flags.TLSOptions{} - logOptions = logs.NewOptions() + metricsBindAddr string + enableLeaderElection bool + leaderElectionLeaseDuration time.Duration + leaderElectionRenewDeadline time.Duration + leaderElectionRetryPeriod time.Duration + watchFilterValue string + watchNamespace string + profilerAddress string + enableContentionProfiling bool + clusterConcurrency int + clusterCacheTrackerConcurrency int + kubeadmConfigConcurrency int + syncPeriod time.Duration + restConfigQPS float32 + restConfigBurst int + webhookPort int + webhookCertDir string + healthAddr string + tokenTTL time.Duration + tlsOptions = flags.TLSOptions{} + logOptions = logs.NewOptions() ) // InitFlags initializes this manager's flags. @@ -126,7 +127,12 @@ func InitFlags(fs *pflag.FlagSet) { fs.BoolVar(&enableContentionProfiling, "contention-profiling", false, "Enable block profiling, if profiler-address is set.") + // Deprecated: This flag has no function anymore is going to be removed in a next release. + // Use clustercachetracker-concurrency instead. fs.IntVar(&clusterConcurrency, "cluster-concurrency", 10, + "This flag has no function anymore and got replaced by \"--clustercachetracker-concurrency\".") + + fs.IntVar(&clusterCacheTrackerConcurrency, "clustercachetracker-concurrency", 10, "Number of clusters to process simultaneously") fs.IntVar(&kubeadmConfigConcurrency, "kubeadmconfig-concurrency", 10, @@ -307,7 +313,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { Client: mgr.GetClient(), Tracker: tracker, WatchFilterValue: watchFilterValue, - }).SetupWithManager(ctx, mgr, concurrency(clusterConcurrency)); err != nil { + }).SetupWithManager(ctx, mgr, concurrency(clusterCacheTrackerConcurrency)); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler") os.Exit(1) } diff --git a/controlplane/kubeadm/main.go b/controlplane/kubeadm/main.go index f9a6bfab188f..fdce0978572b 100644 --- a/controlplane/kubeadm/main.go +++ b/controlplane/kubeadm/main.go @@ -91,6 +91,7 @@ var ( profilerAddress string enableContentionProfiling bool kubeadmControlPlaneConcurrency int + clusterCacheTrackerConcurrency int syncPeriod time.Duration restConfigQPS float32 restConfigBurst int @@ -134,6 +135,9 @@ func InitFlags(fs *pflag.FlagSet) { fs.IntVar(&kubeadmControlPlaneConcurrency, "kubeadmcontrolplane-concurrency", 10, "Number of kubeadm control planes to process simultaneously") + fs.IntVar(&clusterCacheTrackerConcurrency, "clustercachetracker-concurrency", 10, + "Number of clusters to process simultaneously") + fs.DurationVar(&syncPeriod, "sync-period", 10*time.Minute, "The minimum interval at which watched resources are reconciled (e.g. 15m)") @@ -320,7 +324,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { Client: mgr.GetClient(), Tracker: tracker, WatchFilterValue: watchFilterValue, - }).SetupWithManager(ctx, mgr, concurrency(kubeadmControlPlaneConcurrency)); err != nil { + }).SetupWithManager(ctx, mgr, concurrency(clusterCacheTrackerConcurrency)); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler") os.Exit(1) } diff --git a/main.go b/main.go index 7cb437958372..2890605f7654 100644 --- a/main.go +++ b/main.go @@ -82,34 +82,35 @@ var ( controllerName = "cluster-api-controller-manager" // flags. - metricsBindAddr string - enableLeaderElection bool - leaderElectionLeaseDuration time.Duration - leaderElectionRenewDeadline time.Duration - leaderElectionRetryPeriod time.Duration - watchNamespace string - watchFilterValue string - profilerAddress string - enableContentionProfiling bool - clusterTopologyConcurrency int - clusterClassConcurrency int - clusterConcurrency int - extensionConfigConcurrency int - machineConcurrency int - machineSetConcurrency int - machineDeploymentConcurrency int - machinePoolConcurrency int - clusterResourceSetConcurrency int - machineHealthCheckConcurrency int - syncPeriod time.Duration - restConfigQPS float32 - restConfigBurst int - nodeDrainClientTimeout time.Duration - webhookPort int - webhookCertDir string - healthAddr string - tlsOptions = flags.TLSOptions{} - logOptions = logs.NewOptions() + metricsBindAddr string + enableLeaderElection bool + leaderElectionLeaseDuration time.Duration + leaderElectionRenewDeadline time.Duration + leaderElectionRetryPeriod time.Duration + watchNamespace string + watchFilterValue string + profilerAddress string + enableContentionProfiling bool + clusterTopologyConcurrency int + clusterCacheTrackerConcurrency int + clusterClassConcurrency int + clusterConcurrency int + extensionConfigConcurrency int + machineConcurrency int + machineSetConcurrency int + machineDeploymentConcurrency int + machinePoolConcurrency int + clusterResourceSetConcurrency int + machineHealthCheckConcurrency int + syncPeriod time.Duration + restConfigQPS float32 + restConfigBurst int + nodeDrainClientTimeout time.Duration + webhookPort int + webhookCertDir string + healthAddr string + tlsOptions = flags.TLSOptions{} + logOptions = logs.NewOptions() ) func init() { @@ -177,6 +178,9 @@ func InitFlags(fs *pflag.FlagSet) { fs.IntVar(&clusterConcurrency, "cluster-concurrency", 10, "Number of clusters to process simultaneously") + fs.IntVar(&clusterCacheTrackerConcurrency, "clustercachetracker-concurrency", 10, + "Number of clusters to process simultaneously") + fs.IntVar(&extensionConfigConcurrency, "extensionconfig-concurrency", 10, "Number of extension configs to process simultaneously") @@ -394,7 +398,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { Client: mgr.GetClient(), Tracker: tracker, WatchFilterValue: watchFilterValue, - }).SetupWithManager(ctx, mgr, concurrency(clusterConcurrency)); err != nil { + }).SetupWithManager(ctx, mgr, concurrency(clusterCacheTrackerConcurrency)); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler") os.Exit(1) } diff --git a/test/infrastructure/docker/main.go b/test/infrastructure/docker/main.go index f87d8c777fab..9896dd04b445 100644 --- a/test/infrastructure/docker/main.go +++ b/test/infrastructure/docker/main.go @@ -68,24 +68,25 @@ var ( controllerName = "cluster-api-docker-controller-manager" // flags. - metricsBindAddr string - enableLeaderElection bool - leaderElectionLeaseDuration time.Duration - leaderElectionRenewDeadline time.Duration - leaderElectionRetryPeriod time.Duration - watchNamespace string - watchFilterValue string - profilerAddress string - enableContentionProfiling bool - concurrency int - syncPeriod time.Duration - restConfigQPS float32 - restConfigBurst int - webhookPort int - webhookCertDir string - healthAddr string - tlsOptions = flags.TLSOptions{} - logOptions = logs.NewOptions() + metricsBindAddr string + enableLeaderElection bool + leaderElectionLeaseDuration time.Duration + leaderElectionRenewDeadline time.Duration + leaderElectionRetryPeriod time.Duration + watchNamespace string + watchFilterValue string + profilerAddress string + enableContentionProfiling bool + concurrency int + clusterCacheTrackerConcurrency int + syncPeriod time.Duration + restConfigQPS float32 + restConfigBurst int + webhookPort int + webhookCertDir string + healthAddr string + tlsOptions = flags.TLSOptions{} + logOptions = logs.NewOptions() ) func init() { @@ -135,6 +136,9 @@ func initFlags(fs *pflag.FlagSet) { fs.IntVar(&concurrency, "concurrency", 10, "The number of docker machines to process simultaneously") + fs.IntVar(&clusterCacheTrackerConcurrency, "clustercachetracker-concurrency", 10, + "Number of clusters to process simultaneously") + fs.DurationVar(&syncPeriod, "sync-period", 10*time.Minute, "The minimum interval at which watched resources are reconciled (e.g. 15m)") @@ -316,7 +320,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { Tracker: tracker, WatchFilterValue: watchFilterValue, }).SetupWithManager(ctx, mgr, controller.Options{ - MaxConcurrentReconciles: concurrency, + MaxConcurrentReconciles: clusterCacheTrackerConcurrency, }); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler") os.Exit(1)