Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: infer correct shard in statefulset setup (#17124, #17016) (#17167) #17204

Merged
merged 2 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@ import (
cacheutil "github.com/argoproj/argo-cd/v2/util/cache"
appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/cli"
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/argo-cd/v2/util/errors"
kubeutil "github.com/argoproj/argo-cd/v2/util/kube"
"github.com/argoproj/argo-cd/v2/util/settings"
"github.com/argoproj/argo-cd/v2/util/tls"
"github.com/argoproj/argo-cd/v2/util/trace"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
Expand Down Expand Up @@ -147,7 +144,7 @@ func NewCommand() *cobra.Command {
appController.InvalidateProjectsCache()
}))
kubectl := kubeutil.NewKubectl()
clusterSharding, err := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution)
clusterSharding, err := sharding.GetClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution)
errors.CheckError(err)
appController, err = controller.NewApplicationController(
namespace,
Expand Down Expand Up @@ -237,64 +234,3 @@ func NewCommand() *cobra.Command {
})
return &command
}

func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (sharding.ClusterShardingCache, error) {
var (
replicasCount int
)
// StatefulSet mode and Deployment mode uses different default values for shard number.
defaultShardNumberValue := 0

if enableDynamicClusterDistribution {
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})

// if app controller deployment is not found when dynamic cluster distribution is enabled error out
if err != nil {
return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment: %v", err)
}

if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
replicasCount = int(*appControllerDeployment.Spec.Replicas)
defaultShardNumberValue = -1
} else {
return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment replica count")
}

} else {
replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
}
shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, defaultShardNumberValue, -math.MaxInt32, math.MaxInt32)
if replicasCount > 1 {
// check for shard mapping using configmap if application-controller is a deployment
// else use existing logic to infer shard from pod name if application-controller is a statefulset
if enableDynamicClusterDistribution {
var err error
// retry 3 times if we find a conflict while updating shard mapping configMap.
// If we still see conflicts after the retries, wait for next iteration of heartbeat process.
for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ {
shardNumber, err = sharding.GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicasCount, shardNumber)
if err != nil && !kubeerrors.IsConflict(err) {
err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %s", err)
break
}
log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i)
}
errors.CheckError(err)
} else {
if shardNumber < 0 {
var err error
shardNumber, err = sharding.InferShard()
errors.CheckError(err)
}
if shardNumber > replicasCount {
log.Warnf("Calculated shard number %d is greated than the number of replicas count. Defaulting to 0", shardNumber)
shardNumber = 0
}
}
} else {
log.Info("Processing all cluster shards")
}
db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil
}
2 changes: 1 addition & 1 deletion controller/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ func (c *liveStateCache) handleAddEvent(cluster *appv1.Cluster) {
}

func (c *liveStateCache) handleModEvent(oldCluster *appv1.Cluster, newCluster *appv1.Cluster) {
c.clusterSharding.Update(newCluster)
c.clusterSharding.Update(oldCluster, newCluster)
c.lock.Lock()
cluster, ok := c.clusters[newCluster.Server]
c.lock.Unlock()
Expand Down
60 changes: 41 additions & 19 deletions controller/sharding/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ClusterShardingCache interface {
Init(clusters *v1alpha1.ClusterList)
Add(c *v1alpha1.Cluster)
Delete(clusterServer string)
Update(c *v1alpha1.Cluster)
Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster)
IsManagedCluster(c *v1alpha1.Cluster) bool
GetDistribution() map[string]int
}
Expand All @@ -26,7 +26,7 @@ type ClusterSharding struct {
getClusterShard DistributionFunction
}

func NewClusterSharding(db db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache {
func NewClusterSharding(_ db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache {
log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm)
clusterSharding := &ClusterSharding{
Shard: shard,
Expand Down Expand Up @@ -67,7 +67,8 @@ func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) {
defer sharding.lock.Unlock()
newClusters := make(map[string]*v1alpha1.Cluster, len(clusters.Items))
for _, c := range clusters.Items {
newClusters[c.Server] = &c
cluster := c
newClusters[c.Server] = &cluster
}
sharding.Clusters = newClusters
sharding.updateDistribution()
Expand Down Expand Up @@ -96,13 +97,16 @@ func (sharding *ClusterSharding) Delete(clusterServer string) {
}
}

func (sharding *ClusterSharding) Update(c *v1alpha1.Cluster) {
func (sharding *ClusterSharding) Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster) {
sharding.lock.Lock()
defer sharding.lock.Unlock()

old, ok := sharding.Clusters[c.Server]
sharding.Clusters[c.Server] = c
if !ok || hasShardingUpdates(old, c) {
if _, ok := sharding.Clusters[oldCluster.Server]; ok && oldCluster.Server != newCluster.Server {
delete(sharding.Clusters, oldCluster.Server)
delete(sharding.Shards, oldCluster.Server)
}
sharding.Clusters[newCluster.Server] = newCluster
if hasShardingUpdates(oldCluster, newCluster) {
sharding.updateDistribution()
} else {
log.Debugf("Skipping sharding distribution update. No relevant changes")
Expand All @@ -111,8 +115,8 @@ func (sharding *ClusterSharding) Update(c *v1alpha1.Cluster) {

func (sharding *ClusterSharding) GetDistribution() map[string]int {
sharding.lock.RLock()
defer sharding.lock.RUnlock()
shards := sharding.Shards
sharding.lock.RUnlock()

distribution := make(map[string]int, len(shards))
for k, v := range shards {
Expand All @@ -122,9 +126,7 @@ func (sharding *ClusterSharding) GetDistribution() map[string]int {
}

func (sharding *ClusterSharding) updateDistribution() {
log.Info("Updating cluster shards")

for _, c := range sharding.Clusters {
for k, c := range sharding.Clusters {
shard := 0
if c.Shard != nil {
requestedShard := int(*c.Shard)
Expand All @@ -136,24 +138,44 @@ func (sharding *ClusterSharding) updateDistribution() {
} else {
shard = sharding.getClusterShard(c)
}
var shard64 int64 = int64(shard)
c.Shard = &shard64
sharding.Shards[c.Server] = shard

existingShard, ok := sharding.Shards[k]
if ok && existingShard != shard {
log.Infof("Cluster %s has changed shard from %d to %d", k, existingShard, shard)
} else if !ok {
log.Infof("Cluster %s has been assigned to shard %d", k, shard)
} else {
log.Debugf("Cluster %s has not changed shard", k)
}
sharding.Shards[k] = shard
}
}

// hasShardingUpdates returns true if the sharding distribution has been updated.
// nil checking is done for the corner case of the in-cluster cluster which may
// have a nil shard assigned
// hasShardingUpdates returns true if the sharding distribution has explicitly changed
func hasShardingUpdates(old, new *v1alpha1.Cluster) bool {
if old == nil || new == nil || (old.Shard == nil && new.Shard == nil) {
if old == nil || new == nil {
return false
}

// returns true if the cluster id has changed because some sharding algorithms depend on it.
if old.ID != new.ID {
return true
}

if old.Server != new.Server {
return true
}

// return false if the shard field has not been modified
if old.Shard == nil && new.Shard == nil {
return false
}
return old.Shard != new.Shard
return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard)
}

func (d *ClusterSharding) GetClusterAccessor() clusterAccessor {
return func() []*v1alpha1.Cluster {
// no need to lock, as this is only called from the updateDistribution function
clusters := make([]*v1alpha1.Cluster, 0, len(d.Clusters))
for _, c := range d.Clusters {
clusters = append(clusters, c)
Expand Down
Loading
Loading