Skip to content

Commit

Permalink
fix: infer correct shard in statefulset setup (argoproj#17124, argopr…
Browse files Browse the repository at this point in the history
…oj#17016)  (argoproj#17167)

* fix: infer correct shard in statefulset setup

Signed-off-by: Lukas Wöhrl <[email protected]>

* fix the case if only a single replica

Signed-off-by: Lukas Wöhrl <[email protected]>

* fix: resolving pointer on shard compare

Signed-off-by: Lukas Wöhrl <[email protected]>

* fix: add readlock for cluster accessor

Signed-off-by: Lukas Wöhrl <[email protected]>

* fix: use defer to protect access of 'shard'

Signed-off-by: Lukas Wöhrl <[email protected]>

* fix: revert locking in getclusteraccessor

Signed-off-by: Lukas Wöhrl <[email protected]>

* fix: handle nil shard case

Signed-off-by: Lukas Wöhrl <[email protected]>

* fix: handle any nil shard value as false

Signed-off-by: Lukas Wöhrl <[email protected]>

* fix: handle nil case and fix another missing pointer dereference 

Signed-off-by: Lukas Wöhrl <[email protected]>

* revert

Signed-off-by: Lukas Wöhrl <[email protected]>

* fix: added tests and fixed some behaviour bugs

Signed-off-by: Lukas Wöhrl <[email protected]>

* test: add test to validate that Shard value is not overriden

Signed-off-by: Lukas Wöhrl <[email protected]>

* fix: added tests and fixe the case when server is changed inside a secret

Signed-off-by: Lukas Wöhrl <[email protected]>

* tests: add test cases for infering the shard logic

Signed-off-by: Lukas Wöhrl <[email protected]>

---------

Signed-off-by: Lukas Wöhrl <[email protected]>
  • Loading branch information
woehrl01 authored and Julien Fuix committed Feb 14, 2024
1 parent 4ff2b99 commit 01cb830
Show file tree
Hide file tree
Showing 6 changed files with 766 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

Expand All @@ -26,7 +24,6 @@ import (
cacheutil "github.com/argoproj/argo-cd/v2/util/cache"
appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/cli"
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/argo-cd/v2/util/errors"
kubeutil "github.com/argoproj/argo-cd/v2/util/kube"
Expand Down Expand Up @@ -147,7 +144,7 @@ func NewCommand() *cobra.Command {
appController.InvalidateProjectsCache()
}))
kubectl := kubeutil.NewKubectl()
clusterSharding, err := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution)
clusterSharding, err := sharding.GetClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution)
errors.CheckError(err)
appController, err = controller.NewApplicationController(
namespace,
Expand Down Expand Up @@ -239,64 +236,3 @@ func NewCommand() *cobra.Command {
})
return &command
}

func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (sharding.ClusterShardingCache, error) {
var (
replicasCount int
)
// StatefulSet mode and Deployment mode uses different default values for shard number.
defaultShardNumberValue := 0

if enableDynamicClusterDistribution {
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})

// if app controller deployment is not found when dynamic cluster distribution is enabled error out
if err != nil {
return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment: %v", err)
}

if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
replicasCount = int(*appControllerDeployment.Spec.Replicas)
defaultShardNumberValue = -1
} else {
return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment replica count")
}

} else {
replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
}
shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, defaultShardNumberValue, -math.MaxInt32, math.MaxInt32)
if replicasCount > 1 {
// check for shard mapping using configmap if application-controller is a deployment
// else use existing logic to infer shard from pod name if application-controller is a statefulset
if enableDynamicClusterDistribution {
var err error
// retry 3 times if we find a conflict while updating shard mapping configMap.
// If we still see conflicts after the retries, wait for next iteration of heartbeat process.
for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ {
shardNumber, err = sharding.GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicasCount, shardNumber)
if err != nil && !kubeerrors.IsConflict(err) {
err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %s", err)
break
}
log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i)
}
errors.CheckError(err)
} else {
if shardNumber < 0 {
var err error
shardNumber, err = sharding.InferShard()
errors.CheckError(err)
}
if shardNumber > replicasCount {
log.Warnf("Calculated shard number %d is greated than the number of replicas count. Defaulting to 0", shardNumber)
shardNumber = 0
}
}
} else {
log.Info("Processing all cluster shards")
}
db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil
}
2 changes: 1 addition & 1 deletion controller/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ func (c *liveStateCache) handleAddEvent(cluster *appv1.Cluster) {
}

func (c *liveStateCache) handleModEvent(oldCluster *appv1.Cluster, newCluster *appv1.Cluster) {
c.clusterSharding.Update(newCluster)
c.clusterSharding.Update(oldCluster, newCluster)
c.lock.Lock()
cluster, ok := c.clusters[newCluster.Server]
c.lock.Unlock()
Expand Down
60 changes: 41 additions & 19 deletions controller/sharding/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ClusterShardingCache interface {
Init(clusters *v1alpha1.ClusterList)
Add(c *v1alpha1.Cluster)
Delete(clusterServer string)
Update(c *v1alpha1.Cluster)
Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster)
IsManagedCluster(c *v1alpha1.Cluster) bool
GetDistribution() map[string]int
}
Expand All @@ -26,7 +26,7 @@ type ClusterSharding struct {
getClusterShard DistributionFunction
}

func NewClusterSharding(db db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache {
func NewClusterSharding(_ db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache {
log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm)
clusterSharding := &ClusterSharding{
Shard: shard,
Expand Down Expand Up @@ -67,7 +67,8 @@ func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) {
defer sharding.lock.Unlock()
newClusters := make(map[string]*v1alpha1.Cluster, len(clusters.Items))
for _, c := range clusters.Items {
newClusters[c.Server] = &c
cluster := c
newClusters[c.Server] = &cluster
}
sharding.Clusters = newClusters
sharding.updateDistribution()
Expand Down Expand Up @@ -96,13 +97,16 @@ func (sharding *ClusterSharding) Delete(clusterServer string) {
}
}

func (sharding *ClusterSharding) Update(c *v1alpha1.Cluster) {
func (sharding *ClusterSharding) Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster) {
sharding.lock.Lock()
defer sharding.lock.Unlock()

old, ok := sharding.Clusters[c.Server]
sharding.Clusters[c.Server] = c
if !ok || hasShardingUpdates(old, c) {
if _, ok := sharding.Clusters[oldCluster.Server]; ok && oldCluster.Server != newCluster.Server {
delete(sharding.Clusters, oldCluster.Server)
delete(sharding.Shards, oldCluster.Server)
}
sharding.Clusters[newCluster.Server] = newCluster
if hasShardingUpdates(oldCluster, newCluster) {
sharding.updateDistribution()
} else {
log.Debugf("Skipping sharding distribution update. No relevant changes")
Expand All @@ -111,8 +115,8 @@ func (sharding *ClusterSharding) Update(c *v1alpha1.Cluster) {

func (sharding *ClusterSharding) GetDistribution() map[string]int {
sharding.lock.RLock()
defer sharding.lock.RUnlock()
shards := sharding.Shards
sharding.lock.RUnlock()

distribution := make(map[string]int, len(shards))
for k, v := range shards {
Expand All @@ -122,9 +126,7 @@ func (sharding *ClusterSharding) GetDistribution() map[string]int {
}

func (sharding *ClusterSharding) updateDistribution() {
log.Info("Updating cluster shards")

for _, c := range sharding.Clusters {
for k, c := range sharding.Clusters {
shard := 0
if c.Shard != nil {
requestedShard := int(*c.Shard)
Expand All @@ -136,24 +138,44 @@ func (sharding *ClusterSharding) updateDistribution() {
} else {
shard = sharding.getClusterShard(c)
}
var shard64 int64 = int64(shard)
c.Shard = &shard64
sharding.Shards[c.Server] = shard

existingShard, ok := sharding.Shards[k]
if ok && existingShard != shard {
log.Infof("Cluster %s has changed shard from %d to %d", k, existingShard, shard)
} else if !ok {
log.Infof("Cluster %s has been assigned to shard %d", k, shard)
} else {
log.Debugf("Cluster %s has not changed shard", k)
}
sharding.Shards[k] = shard
}
}

// hasShardingUpdates returns true if the sharding distribution has been updated.
// nil checking is done for the corner case of the in-cluster cluster which may
// have a nil shard assigned
// hasShardingUpdates returns true if the sharding distribution has explicitly changed
func hasShardingUpdates(old, new *v1alpha1.Cluster) bool {
if old == nil || new == nil || (old.Shard == nil && new.Shard == nil) {
if old == nil || new == nil {
return false
}

// returns true if the cluster id has changed because some sharding algorithms depend on it.
if old.ID != new.ID {
return true
}

if old.Server != new.Server {
return true
}

// return false if the shard field has not been modified
if old.Shard == nil && new.Shard == nil {
return false
}
return old.Shard != new.Shard
return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard)
}

func (d *ClusterSharding) GetClusterAccessor() clusterAccessor {
return func() []*v1alpha1.Cluster {
// no need to lock, as this is only called from the updateDistribution function
clusters := make([]*v1alpha1.Cluster, 0, len(d.Clusters))
for _, c := range d.Clusters {
clusters = append(clusters, c)
Expand Down
Loading

0 comments on commit 01cb830

Please sign in to comment.