Skip to content

Commit

Permalink
feat: add 'argocd admin cluster shard balance' command
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Matyushentsev <[email protected]>
  • Loading branch information
alexmt committed Dec 20, 2021
1 parent 487db97 commit 09db887
Showing 1 changed file with 120 additions and 1 deletion.
121 changes: 120 additions & 1 deletion cmd/argocd/commands/admin/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ func loadClusters(kubeClient *kubernetes.Clientset, appClient *versioned.Clients
cluster := batch[i]
clusterShard := 0
if replicas > 0 {
clusterShard = sharding.GetShardByID(cluster.ID, replicas)
if cluster.Shard == nil {
clusterShard = sharding.GetShardByID(cluster.ID, replicas)
} else {
clusterShard = int(*cluster.Shard)
}
}

if shard != -1 && clusterShard != shard {
Expand Down Expand Up @@ -194,6 +198,7 @@ func NewClusterShardsCommand() *cobra.Command {
command.Flags().IntVar(&replicas, "replicas", 0, "Application controller replicas count. Inferred from number of running controller pods if not specified")
command.Flags().BoolVar(&portForwardRedis, "port-forward-redis", true, "Automatically port-forward ha proxy redis from current namespace?")
cacheSrc = appstatecache.AddCacheFlagsToCmd(&command)
command.AddCommand(newBalanceClusterShardsCommand())
return &command
}

Expand All @@ -216,6 +221,120 @@ func printStatsSummary(clusters []ClusterWithInfo) {
_ = w.Flush()
}

func newBalanceClusterShardsCommand() *cobra.Command {
var (
replicas int
clientConfig clientcmd.ClientConfig
cacheSrc func() (*appstatecache.Cache, error)
portForwardRedis bool
dryRun bool
reset bool
deviationPercent int
)
var command = cobra.Command{
Use: "balance",
Short: "Print information about each controller shard and portion of Kubernetes resources it is responsible for.",
Run: func(cmd *cobra.Command, args []string) {
log.SetLevel(log.WarnLevel)

clientCfg, err := clientConfig.ClientConfig()
errors.CheckError(err)
namespace, _, err := clientConfig.Namespace()
errors.CheckError(err)
kubeClient := kubernetes.NewForConfigOrDie(clientCfg)
appClient := versioned.NewForConfigOrDie(clientCfg)

if replicas == 0 {
replicas, err = getControllerReplicas(kubeClient, namespace)
errors.CheckError(err)
}
if replicas == 0 {
return
}

clusters, err := loadClusters(kubeClient, appClient, replicas, namespace, portForwardRedis, cacheSrc, -1)
errors.CheckError(err)

if len(clusters) == 0 {
return
}

if reset {
for i := range clusters {
c := clusters[i]
c.Shard = -1
clusters[i] = c
}
} else {
clusters = balanceClustersFirstFit(clusters, replicas, deviationPercent)
}
if dryRun {
_, _ = fmt.Fprintf(os.Stdout, "Dry run, not applying changes\n")
} else {
argoDB := db.NewDB(namespace, settings.NewSettingsManager(context.TODO(), kubeClient, namespace), kubeClient)

for _, c := range clusters {
cluster := &c.Cluster
oldShard := int64(-1)
if c.Cluster.Shard != nil {
oldShard = *cluster.Shard
}

if int64(c.Shard) != oldShard {
if c.Shard > -1 {
s := int64(c.Shard)
cluster.Shard = &s
} else {
c.Cluster.Shard = nil
}
_, err = argoDB.UpdateCluster(context.Background(), cluster)
errors.CheckError(err)
_, _ = fmt.Fprintf(os.Stdout, "Updated cluster %s with shard %d\n", cluster.Server, c.Shard)
}
}
}
printStatsSummary(clusters)
},
}

clientConfig = cli.AddKubectlFlagsToCmd(&command)
command.Flags().BoolVar(&dryRun, "dry-run", true, "Dry run.")
command.Flags().BoolVar(&reset, "reset", false, "Remove shard from all clusters.")
command.Flags().IntVar(&deviationPercent, "deviation", 20, "Allowed deviation (%) from the perfect distribution during.")
command.Flags().IntVar(&replicas, "replicas", 0, "Application controller replicas count. Inferred from number of running controller pods if not specified")
command.Flags().BoolVar(&portForwardRedis, "port-forward-redis", true, "Automatically port-forward ha proxy redis from current namespace?")
cacheSrc = appstatecache.AddCacheFlagsToCmd(&command)
return &command
}

func balanceClustersFirstFit(clusters []ClusterWithInfo, replicas int, deviationPercent int) []ClusterWithInfo {
sort.Slice(clusters, func(i, j int) bool {
return clusters[i].Info.CacheInfo.ResourcesCount > clusters[j].Info.CacheInfo.ResourcesCount
})
totalResources := int64(0)
for _, c := range clusters {
totalResources += c.Info.CacheInfo.ResourcesCount
}

deviation := 1.0 + float64(deviationPercent)/100.0
avgPerReplica := int64(float64(totalResources) / float64(replicas) * deviation)

nextShard := 0
resourcesInShard := int64(0)
for i, c := range clusters {
if cnt := resourcesInShard + c.Info.CacheInfo.ResourcesCount; cnt > avgPerReplica && resourcesInShard != 0 && nextShard < replicas-1 {
nextShard++
resourcesInShard = c.Info.CacheInfo.ResourcesCount
} else {
resourcesInShard = cnt
}
c.Shard = nextShard
clusters[i] = c
}

return clusters
}

func runClusterNamespacesCommand(clientConfig clientcmd.ClientConfig, action func(appClient *versioned.Clientset, argoDB db.ArgoDB, clusters map[string][]string) error) error {
clientCfg, err := clientConfig.ClientConfig()
if err != nil {
Expand Down

0 comments on commit 09db887

Please sign in to comment.