diff --git a/cmd/argocd/commands/admin/cluster.go b/cmd/argocd/commands/admin/cluster.go index 69b5988f167d9..de0282789b6c1 100644 --- a/cmd/argocd/commands/admin/cluster.go +++ b/cmd/argocd/commands/admin/cluster.go @@ -118,7 +118,11 @@ func loadClusters(kubeClient *kubernetes.Clientset, appClient *versioned.Clients cluster := batch[i] clusterShard := 0 if replicas > 0 { - clusterShard = sharding.GetShardByID(cluster.ID, replicas) + if cluster.Shard == nil { + clusterShard = sharding.GetShardByID(cluster.ID, replicas) + } else { + clusterShard = int(*cluster.Shard) + } } if shard != -1 && clusterShard != shard { @@ -194,6 +198,7 @@ func NewClusterShardsCommand() *cobra.Command { command.Flags().IntVar(&replicas, "replicas", 0, "Application controller replicas count. Inferred from number of running controller pods if not specified") command.Flags().BoolVar(&portForwardRedis, "port-forward-redis", true, "Automatically port-forward ha proxy redis from current namespace?") cacheSrc = appstatecache.AddCacheFlagsToCmd(&command) + command.AddCommand(newBalanceClusterShardsCommand()) return &command } @@ -216,6 +221,120 @@ func printStatsSummary(clusters []ClusterWithInfo) { _ = w.Flush() } +func newBalanceClusterShardsCommand() *cobra.Command { + var ( + replicas int + clientConfig clientcmd.ClientConfig + cacheSrc func() (*appstatecache.Cache, error) + portForwardRedis bool + dryRun bool + reset bool + deviationPercent int + ) + var command = cobra.Command{ + Use: "balance", + Short: "Print information about each controller shard and portion of Kubernetes resources it is responsible for.", + Run: func(cmd *cobra.Command, args []string) { + log.SetLevel(log.WarnLevel) + + clientCfg, err := clientConfig.ClientConfig() + errors.CheckError(err) + namespace, _, err := clientConfig.Namespace() + errors.CheckError(err) + kubeClient := kubernetes.NewForConfigOrDie(clientCfg) + appClient := versioned.NewForConfigOrDie(clientCfg) + + if replicas == 0 { + replicas, err = getControllerReplicas(kubeClient, namespace) + errors.CheckError(err) + } + if replicas == 0 { + return + } + + clusters, err := loadClusters(kubeClient, appClient, replicas, namespace, portForwardRedis, cacheSrc, -1) + errors.CheckError(err) + + if len(clusters) == 0 { + return + } + + if reset { + for i := range clusters { + c := clusters[i] + c.Shard = -1 + clusters[i] = c + } + } else { + clusters = balanceClustersFirstFit(clusters, replicas, deviationPercent) + } + if dryRun { + _, _ = fmt.Fprintf(os.Stdout, "Dry run, not applying changes\n") + } else { + argoDB := db.NewDB(namespace, settings.NewSettingsManager(context.TODO(), kubeClient, namespace), kubeClient) + + for _, c := range clusters { + cluster := &c.Cluster + oldShard := int64(-1) + if c.Cluster.Shard != nil { + oldShard = *cluster.Shard + } + + if int64(c.Shard) != oldShard { + if c.Shard > -1 { + s := int64(c.Shard) + cluster.Shard = &s + } else { + c.Cluster.Shard = nil + } + _, err = argoDB.UpdateCluster(context.Background(), cluster) + errors.CheckError(err) + _, _ = fmt.Fprintf(os.Stdout, "Updated cluster %s with shard %d\n", cluster.Server, c.Shard) + } + } + } + printStatsSummary(clusters) + }, + } + + clientConfig = cli.AddKubectlFlagsToCmd(&command) + command.Flags().BoolVar(&dryRun, "dry-run", true, "Dry run.") + command.Flags().BoolVar(&reset, "reset", false, "Remove shard from all clusters.") + command.Flags().IntVar(&deviationPercent, "deviation", 20, "Allowed deviation (%) from the perfect distribution during.") + command.Flags().IntVar(&replicas, "replicas", 0, "Application controller replicas count. Inferred from number of running controller pods if not specified") + command.Flags().BoolVar(&portForwardRedis, "port-forward-redis", true, "Automatically port-forward ha proxy redis from current namespace?") + cacheSrc = appstatecache.AddCacheFlagsToCmd(&command) + return &command +} + +func balanceClustersFirstFit(clusters []ClusterWithInfo, replicas int, deviationPercent int) []ClusterWithInfo { + sort.Slice(clusters, func(i, j int) bool { + return clusters[i].Info.CacheInfo.ResourcesCount > clusters[j].Info.CacheInfo.ResourcesCount + }) + totalResources := int64(0) + for _, c := range clusters { + totalResources += c.Info.CacheInfo.ResourcesCount + } + + deviation := 1.0 + float64(deviationPercent)/100.0 + avgPerReplica := int64(float64(totalResources) / float64(replicas) * deviation) + + nextShard := 0 + resourcesInShard := int64(0) + for i, c := range clusters { + if cnt := resourcesInShard + c.Info.CacheInfo.ResourcesCount; cnt > avgPerReplica && resourcesInShard != 0 && nextShard < replicas-1 { + nextShard++ + resourcesInShard = c.Info.CacheInfo.ResourcesCount + } else { + resourcesInShard = cnt + } + c.Shard = nextShard + clusters[i] = c + } + + return clusters +} + func runClusterNamespacesCommand(clientConfig clientcmd.ClientConfig, action func(appClient *versioned.Clientset, argoDB db.ArgoDB, clusters map[string][]string) error) error { clientCfg, err := clientConfig.ClientConfig() if err != nil {