Skip to content

Commit

Permalink
clusters all the way down
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Crenshaw <[email protected]>
  • Loading branch information
crenshaw-dev committed Dec 18, 2024
1 parent 000890f commit d732690
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 86 deletions.
17 changes: 9 additions & 8 deletions cmd/argocd/commands/admin/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,14 +416,19 @@ func reconcileApplications(
var items []appReconcileResult
prevServer := ""
for _, app := range appsList.Items {
if prevServer != app.Spec.Destination.Server {
destCluster, err := argo.GetDestinationCluster(ctx, app.Spec.Destination, argoDB)
if err != nil {
return nil, fmt.Errorf("error getting destination cluster: %w", err)
}

if prevServer != destCluster.Server {
if prevServer != "" {
if clusterCache, err := stateCache.GetClusterCache(prevServer); err == nil {
if clusterCache, err := stateCache.GetClusterCache(destCluster); err == nil {
clusterCache.Invalidate()
}
}
printLine("Reconciling apps of %s", app.Spec.Destination.Server)
prevServer = app.Spec.Destination.Server
printLine("Reconciling apps of %s", destCluster.Server)
prevServer = destCluster.Server
}
printLine(app.Name)

Expand All @@ -437,10 +442,6 @@ func reconcileApplications(
sources = append(sources, app.Spec.GetSource())
revisions = append(revisions, app.Spec.GetSource().TargetRevision)

destCluster, err := argo.GetDestinationCluster(ctx, app.Spec.Destination, argoDB)
if err != nil {
return nil, fmt.Errorf("error getting destination cluster: %w", err)
}
res, err := appStateManager.CompareAppState(destCluster, &app, proj, revisions, sources, false, false, nil, false, false)
if err != nil {
return nil, fmt.Errorf("error comparing app states: %w", err)
Expand Down
10 changes: 5 additions & 5 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ func (ctrl *ApplicationController) getResourceTree(destCluster *appv1.Cluster, a
orphanedNodesMap := make(map[kube.ResourceKey]appv1.ResourceNode)
warnOrphaned := true
if proj.Spec.OrphanedResources != nil {
orphanedNodesMap, err = ctrl.stateCache.GetNamespaceTopLevelResources(destCluster.Server, a.Spec.Destination.Namespace)
orphanedNodesMap, err = ctrl.stateCache.GetNamespaceTopLevelResources(destCluster, a.Spec.Destination.Namespace)
if err != nil {
return nil, fmt.Errorf("failed to get namespace top-level resources: %w", err)
}
Expand Down Expand Up @@ -589,7 +589,7 @@ func (ctrl *ApplicationController) getResourceTree(destCluster *appv1.Cluster, a
managedResourcesKeys = append(managedResourcesKeys, kube.GetResourceKey(live))
}
}
err = ctrl.stateCache.IterateHierarchyV2(destCluster.Server, managedResourcesKeys, func(child appv1.ResourceNode, appName string) bool {
err = ctrl.stateCache.IterateHierarchyV2(destCluster, managedResourcesKeys, func(child appv1.ResourceNode, appName string) bool {
permitted, _ := proj.IsResourcePermitted(schema.GroupKind{Group: child.ResourceRef.Group, Kind: child.ResourceRef.Kind}, child.Namespace, destCluster, func(project string) ([]*appv1.Cluster, error) {
clusters, err := ctrl.db.GetProjectClusters(context.TODO(), project)
if err != nil {
Expand All @@ -614,7 +614,7 @@ func (ctrl *ApplicationController) getResourceTree(destCluster *appv1.Cluster, a
orphanedNodesKeys = append(orphanedNodesKeys, k)
}
}
err = ctrl.stateCache.IterateHierarchyV2(destCluster.Server, orphanedNodesKeys, func(child appv1.ResourceNode, appName string) bool {
err = ctrl.stateCache.IterateHierarchyV2(destCluster, orphanedNodesKeys, func(child appv1.ResourceNode, appName string) bool {
belongToAnotherApp := false
if appName != "" {
appKey := ctrl.toAppKey(appName)
Expand Down Expand Up @@ -688,7 +688,7 @@ func (ctrl *ApplicationController) getAppHosts(destCluster *appv1.Cluster, a *ap
allNodesInfo := map[string]statecache.NodeInfo{}
allPodsByNode := map[string][]statecache.PodInfo{}
appPodsByNode := map[string][]statecache.PodInfo{}
err := ctrl.stateCache.IterateResources(destCluster.Server, func(res *clustercache.Resource, info *statecache.ResourceInfo) {
err := ctrl.stateCache.IterateResources(destCluster, func(res *clustercache.Resource, info *statecache.ResourceInfo) {
key := res.ResourceKey()

switch {
Expand Down Expand Up @@ -801,7 +801,7 @@ func (ctrl *ApplicationController) hideSecretData(destCluster *appv1.Cluster, ap
return nil, fmt.Errorf("error getting tracking method: %w", err)
}

clusterCache, err := ctrl.stateCache.GetClusterCache(destCluster.Server)
clusterCache, err := ctrl.stateCache.GetClusterCache(destCluster)
if err != nil {
return nil, fmt.Errorf("error getting cluster cache: %w", err)
}
Expand Down
51 changes: 23 additions & 28 deletions controller/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,21 @@ func init() {

type LiveStateCache interface {
// Returns k8s server version
GetVersionsInfo(serverURL string) (string, []kube.APIResourceInfo, error)
GetVersionsInfo(server *appv1.Cluster) (string, []kube.APIResourceInfo, error)
// Returns true of given group kind is a namespaced resource
IsNamespaced(server string, gk schema.GroupKind) (bool, error)
IsNamespaced(server *appv1.Cluster, gk schema.GroupKind) (bool, error)
// Returns synced cluster cache
GetClusterCache(server string) (clustercache.ClusterCache, error)
GetClusterCache(server *appv1.Cluster) (clustercache.ClusterCache, error)
// Executes give callback against resource specified by the key and all its children
IterateHierarchy(server string, key kube.ResourceKey, action func(child appv1.ResourceNode, appName string) bool) error
IterateHierarchy(server *appv1.Cluster, key kube.ResourceKey, action func(child appv1.ResourceNode, appName string) bool) error
// Executes give callback against resources specified by the keys and all its children
IterateHierarchyV2(server string, keys []kube.ResourceKey, action func(child appv1.ResourceNode, appName string) bool) error
IterateHierarchyV2(server *appv1.Cluster, keys []kube.ResourceKey, action func(child appv1.ResourceNode, appName string) bool) error
// Returns state of live nodes which correspond for target nodes of specified application.
GetManagedLiveObjs(destCluster *appv1.Cluster, a *appv1.Application, targetObjs []*unstructured.Unstructured) (map[kube.ResourceKey]*unstructured.Unstructured, error)
// IterateResources iterates all resource stored in cache
IterateResources(server string, callback func(res *clustercache.Resource, info *ResourceInfo)) error
IterateResources(server *appv1.Cluster, callback func(res *clustercache.Resource, info *ResourceInfo)) error
// Returns all top level resources (resources without owner references) of a specified namespace
GetNamespaceTopLevelResources(server string, namespace string) (map[kube.ResourceKey]appv1.ResourceNode, error)
GetNamespaceTopLevelResources(server *appv1.Cluster, namespace string) (map[kube.ResourceKey]appv1.ResourceNode, error)
// Starts watching resources of each controlled cluster.
Run(ctx context.Context) error
// Returns information about monitored clusters
Expand Down Expand Up @@ -468,9 +468,9 @@ func isTransientNetworkErr(err error) bool {
return false
}

func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, error) {
func (c *liveStateCache) getCluster(cluster *appv1.Cluster) (clustercache.ClusterCache, error) {
c.lock.RLock()
clusterCache, ok := c.clusters[server]
clusterCache, ok := c.clusters[cluster.Server]
cacheSettings := c.cacheSettings
c.lock.RUnlock()

Expand All @@ -481,16 +481,11 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e
c.lock.Lock()
defer c.lock.Unlock()

clusterCache, ok = c.clusters[server]
clusterCache, ok = c.clusters[cluster.Server]
if ok {
return clusterCache, nil
}

cluster, err := c.db.GetCluster(context.Background(), server)
if err != nil {
return nil, fmt.Errorf("error getting cluster: %w", err)
}

if c.clusterSharding == nil {
return nil, fmt.Errorf("unable to handle cluster %s: cluster sharding is not configured", cluster.Server)
}
Expand Down Expand Up @@ -628,12 +623,12 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e
c.metricsServer.ObserveResourceEventsProcessingDuration(cluster.Server, duration, processedEventsNumber)
})

c.clusters[server] = clusterCache
c.clusters[cluster.Server] = clusterCache

return clusterCache, nil
}

func (c *liveStateCache) getSyncedCluster(server string) (clustercache.ClusterCache, error) {
func (c *liveStateCache) getSyncedCluster(server *appv1.Cluster) (clustercache.ClusterCache, error) {
clusterCache, err := c.getCluster(server)
if err != nil {
return nil, fmt.Errorf("error getting cluster: %w", err)
Expand All @@ -658,15 +653,15 @@ func (c *liveStateCache) invalidate(cacheSettings cacheSettings) {
log.Info("live state cache invalidated")
}

func (c *liveStateCache) IsNamespaced(server string, gk schema.GroupKind) (bool, error) {
func (c *liveStateCache) IsNamespaced(server *appv1.Cluster, gk schema.GroupKind) (bool, error) {
clusterInfo, err := c.getSyncedCluster(server)
if err != nil {
return false, err
}
return clusterInfo.IsNamespaced(gk)
}

func (c *liveStateCache) IterateHierarchy(server string, key kube.ResourceKey, action func(child appv1.ResourceNode, appName string) bool) error {
func (c *liveStateCache) IterateHierarchy(server *appv1.Cluster, key kube.ResourceKey, action func(child appv1.ResourceNode, appName string) bool) error {
clusterInfo, err := c.getSyncedCluster(server)
if err != nil {
return err
Expand All @@ -677,7 +672,7 @@ func (c *liveStateCache) IterateHierarchy(server string, key kube.ResourceKey, a
return nil
}

func (c *liveStateCache) IterateHierarchyV2(server string, keys []kube.ResourceKey, action func(child appv1.ResourceNode, appName string) bool) error {
func (c *liveStateCache) IterateHierarchyV2(server *appv1.Cluster, keys []kube.ResourceKey, action func(child appv1.ResourceNode, appName string) bool) error {
clusterInfo, err := c.getSyncedCluster(server)
if err != nil {
return err
Expand All @@ -688,7 +683,7 @@ func (c *liveStateCache) IterateHierarchyV2(server string, keys []kube.ResourceK
return nil
}

func (c *liveStateCache) IterateResources(server string, callback func(res *clustercache.Resource, info *ResourceInfo)) error {
func (c *liveStateCache) IterateResources(server *appv1.Cluster, callback func(res *clustercache.Resource, info *ResourceInfo)) error {
clusterInfo, err := c.getSyncedCluster(server)
if err != nil {
return err
Expand All @@ -702,7 +697,7 @@ func (c *liveStateCache) IterateResources(server string, callback func(res *clus
return nil
}

func (c *liveStateCache) GetNamespaceTopLevelResources(server string, namespace string) (map[kube.ResourceKey]appv1.ResourceNode, error) {
func (c *liveStateCache) GetNamespaceTopLevelResources(server *appv1.Cluster, namespace string) (map[kube.ResourceKey]appv1.ResourceNode, error) {
clusterInfo, err := c.getSyncedCluster(server)
if err != nil {
return nil, err
Expand All @@ -716,7 +711,7 @@ func (c *liveStateCache) GetNamespaceTopLevelResources(server string, namespace
}

func (c *liveStateCache) GetManagedLiveObjs(destCluster *appv1.Cluster, a *appv1.Application, targetObjs []*unstructured.Unstructured) (map[kube.ResourceKey]*unstructured.Unstructured, error) {
clusterInfo, err := c.getSyncedCluster(destCluster.Server)
clusterInfo, err := c.getSyncedCluster(destCluster)
if err != nil {
return nil, fmt.Errorf("failed to get cluster info for %q: %w", destCluster.Server, err)
}
Expand All @@ -725,10 +720,10 @@ func (c *liveStateCache) GetManagedLiveObjs(destCluster *appv1.Cluster, a *appv1
})
}

func (c *liveStateCache) GetVersionsInfo(serverURL string) (string, []kube.APIResourceInfo, error) {
clusterInfo, err := c.getSyncedCluster(serverURL)
func (c *liveStateCache) GetVersionsInfo(server *appv1.Cluster) (string, []kube.APIResourceInfo, error) {
clusterInfo, err := c.getSyncedCluster(server)
if err != nil {
return "", nil, fmt.Errorf("failed to get cluster info for %q: %w", serverURL, err)
return "", nil, fmt.Errorf("failed to get cluster info for %q: %w", server, err)
}
return clusterInfo.GetServerVersion(), clusterInfo.GetAPIResources(), nil
}
Expand Down Expand Up @@ -828,7 +823,7 @@ func (c *liveStateCache) handleAddEvent(cluster *appv1.Cluster) {
if c.isClusterHasApps(c.appInformer.GetStore().List(), cluster) {
go func() {
// warm up cache for cluster with apps
_, _ = c.getSyncedCluster(cluster.Server)
_, _ = c.getSyncedCluster(cluster)
}()
}
}
Expand Down Expand Up @@ -910,6 +905,6 @@ func (c *liveStateCache) GetClustersInfo() []clustercache.ClusterInfo {
return res
}

func (c *liveStateCache) GetClusterCache(server string) (clustercache.ClusterCache, error) {
func (c *liveStateCache) GetClusterCache(server *appv1.Cluster) (clustercache.ClusterCache, error) {
return c.getSyncedCluster(server)
}
Loading

0 comments on commit d732690

Please sign in to comment.