From ead2110612096e3b1b639557e8c30e11ce66f073 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Wed, 1 Jun 2022 16:10:58 +0200 Subject: [PATCH 1/3] feat(gc): Use SelfSubjectRulesReview to scan for garbage collectable resources --- pkg/trait/gc.go | 100 +++++++++++++++++++++++++++++------------------- 1 file changed, 60 insertions(+), 40 deletions(-) diff --git a/pkg/trait/gc.go b/pkg/trait/gc.go index 1ff3204f5b..b118029000 100644 --- a/pkg/trait/gc.go +++ b/pkg/trait/gc.go @@ -19,6 +19,7 @@ package trait import ( "context" + "fmt" "path/filepath" "regexp" "strconv" @@ -26,6 +27,8 @@ import ( "sync" "time" + "github.com/apache/camel-k/pkg/util" + authorization "k8s.io/api/authorization/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -35,7 +38,6 @@ import ( "k8s.io/client-go/discovery" "k8s.io/client-go/discovery/cached/disk" "k8s.io/client-go/discovery/cached/memory" - ctrl "sigs.k8s.io/controller-runtime/pkg/client" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" @@ -81,8 +83,7 @@ func (t *garbageCollectorTrait) Configure(e *Environment) (bool, error) { t.DiscoveryCache = &s } - return e.IntegrationInPhase(v1.IntegrationPhaseInitialization) || e.IntegrationInRunningPhases(), - nil + return e.IntegrationInPhase(v1.IntegrationPhaseInitialization) || e.IntegrationInRunningPhases(), nil } func (t *garbageCollectorTrait) Apply(e *Environment) error { @@ -92,12 +93,9 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error { // Register a post action that deletes the existing resources that are labelled // with the previous integration generations. // TODO: this should be refined so that it's run when all the replicas for the newer generation - // are ready. This is to be added when the integration scale status is refined with ready replicas + // are ready. e.PostActions = append(e.PostActions, func(env *Environment) error { - // The collection and deletion are performed asynchronously to avoid blocking - // the reconciliation loop. - go t.garbageCollectResources(env) - return nil + return t.garbageCollectResources(env) }) fallthrough @@ -121,43 +119,40 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error { return nil } -func (t *garbageCollectorTrait) garbageCollectResources(e *Environment) { +func (t *garbageCollectorTrait) garbageCollectResources(e *Environment) error { + deletableGVKs, err := t.getDeletableTypes(e) + if err != nil { + return fmt.Errorf("cannot discover GVK types: %v", err) + } + integration, _ := labels.NewRequirement(v1.IntegrationLabel, selection.Equals, []string{e.Integration.Name}) generation, err := labels.NewRequirement("camel.apache.org/generation", selection.LessThan, []string{strconv.FormatInt(e.Integration.GetGeneration(), 10)}) if err != nil { - t.L.ForIntegration(e.Integration).Errorf(err, "cannot determine generation requirement") - return + return fmt.Errorf("cannot determine generation requirement: %v", err) } selector := labels.NewSelector(). Add(*integration). Add(*generation) - deletableGVKs, err := t.getDeletableTypes(e) - if err != nil { - t.L.ForIntegration(e.Integration).Errorf(err, "cannot discover GVK types") - return - } - - t.deleteEachOf(deletableGVKs, e, selector) + return t.deleteEachOf(e.Ctx, deletableGVKs, e, selector) } -func (t *garbageCollectorTrait) deleteEachOf(gvks map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) { - for gvk := range gvks { +func (t *garbageCollectorTrait) deleteEachOf(ctx context.Context, GVKs map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) error { + for GVK := range GVKs { resources := unstructured.UnstructuredList{ Object: map[string]interface{}{ - "apiVersion": gvk.GroupVersion().String(), - "kind": gvk.Kind, + "apiVersion": GVK.GroupVersion().String(), + "kind": GVK.Kind, }, } options := []ctrl.ListOption{ ctrl.InNamespace(e.Integration.Namespace), ctrl.MatchingLabelsSelector{Selector: selector}, } - if err := t.Client.List(context.TODO(), &resources, options...); err != nil { - if !k8serrors.IsNotFound(err) && !k8serrors.IsForbidden(err) { - t.L.ForIntegration(e.Integration).Errorf(err, "cannot list child resources: %v", gvk) + if err := t.Client.List(ctx, &resources, options...); err != nil { + if !k8serrors.IsNotFound(err) { + return fmt.Errorf("cannot list child resources: %v", err) } - continue } @@ -166,7 +161,7 @@ func (t *garbageCollectorTrait) deleteEachOf(gvks map[schema.GroupVersionKind]st if !t.canBeDeleted(e, r) { continue } - err := t.Client.Delete(context.TODO(), &r, ctrl.PropagationPolicy(metav1.DeletePropagationBackground)) + err := t.Client.Delete(ctx, &r, ctrl.PropagationPolicy(metav1.DeletePropagationBackground)) if err != nil { // The resource may have already been deleted if !k8serrors.IsNotFound(err) { @@ -177,6 +172,8 @@ func (t *garbageCollectorTrait) deleteEachOf(gvks map[schema.GroupVersionKind]st } } } + + return nil } func (t *garbageCollectorTrait) canBeDeleted(e *Environment, u unstructured.Unstructured) bool { @@ -192,7 +189,7 @@ func (t *garbageCollectorTrait) canBeDeleted(e *Environment, u unstructured.Unst func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.GroupVersionKind]struct{}, error) { // We rely on the discovery API to retrieve all the resources GVK, // that results in an unbounded set that can impact garbage collection latency when scaling up. - discoveryClient, err := t.discoveryClient(e) + discoveryClient, err := t.discoveryClient() if err != nil { return nil, err } @@ -206,21 +203,45 @@ func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.Gr // We only take types that support the "delete" verb, // to prevents from performing queries that we know are going to return "MethodNotAllowed". - return groupVersionKinds(discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)), - nil -} + APIResourceLists := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources) + + // Retrieve the permissions granted to the operator service account. + // We assume the operator has only to garbage collect the resources it has created. + srr := &authorization.SelfSubjectRulesReview{ + Spec: authorization.SelfSubjectRulesReviewSpec{ + Namespace: e.Integration.Namespace, + }, + } + res, err := e.Client.AuthorizationV1().SelfSubjectRulesReviews().Create(e.Ctx, srr, metav1.CreateOptions{}) + if err != nil { + return nil, err + } -func groupVersionKinds(rls []*metav1.APIResourceList) map[schema.GroupVersionKind]struct{} { - GVKs := map[schema.GroupVersionKind]struct{}{} - for _, rl := range rls { - for _, r := range rl.APIResources { - GVKs[schema.FromAPIVersionAndKind(rl.GroupVersion, r.Kind)] = struct{}{} + GVKs := make(map[schema.GroupVersionKind]struct{}) + for _, APIResourceList := range APIResourceLists { + for _, resource := range APIResourceList.APIResources { + rule: + for _, rule := range res.Status.ResourceRules { + if !util.StringSliceContainsAnyOf(rule.Verbs, "delete", "*") { + continue + } + for _, group := range rule.APIGroups { + for _, name := range rule.Resources { + if (resource.Group == group || group == "*") && (resource.Name == name || name == "*") { + GVK := schema.FromAPIVersionAndKind(APIResourceList.GroupVersion, resource.Kind) + GVKs[GVK] = struct{}{} + break rule + } + } + } + } } } - return GVKs + + return GVKs, nil } -func (t *garbageCollectorTrait) discoveryClient(e *Environment) (discovery.DiscoveryInterface, error) { +func (t *garbageCollectorTrait) discoveryClient() (discovery.DiscoveryInterface, error) { discoveryClientLock.Lock() defer discoveryClientLock.Unlock() @@ -247,7 +268,6 @@ func (t *garbageCollectorTrait) discoveryClient(e *Environment) (discovery.Disco return t.Client.Discovery(), nil default: - t.L.ForIntegration(e.Integration).Infof("unsupported discovery cache type: %s", *t.DiscoveryCache) - return t.Client.Discovery(), nil + return nil, fmt.Errorf("unsupported discovery cache type: %s", *t.DiscoveryCache) } } From f5d1f7cc196bea9c256b348281e7a0a7967cca17 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Wed, 1 Jun 2022 18:46:20 +0200 Subject: [PATCH 2/3] feat(gc): Rate limit Discovery and SelfSubjectRulesReview requests --- go.mod | 1 + pkg/trait/gc.go | 46 +++++++++++++++++++++++++++++----------------- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index 1fd5047e7c..33bc7bbd09 100644 --- a/go.mod +++ b/go.mod @@ -45,6 +45,7 @@ require ( golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 + golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect gopkg.in/inf.v0 v0.9.1 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.22.5 diff --git a/pkg/trait/gc.go b/pkg/trait/gc.go index b118029000..6ca3af7904 100644 --- a/pkg/trait/gc.go +++ b/pkg/trait/gc.go @@ -27,6 +27,8 @@ import ( "sync" "time" + "golang.org/x/time/rate" + "github.com/apache/camel-k/pkg/util" authorization "k8s.io/api/authorization/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -44,10 +46,13 @@ import ( ) var ( - toFileName = regexp.MustCompile(`[^(\w/\.)]`) - diskCachedDiscoveryClient discovery.CachedDiscoveryInterface - memoryCachedDiscoveryClient discovery.CachedDiscoveryInterface - discoveryClientLock sync.Mutex + toFileName = regexp.MustCompile(`[^(\w/\.)]`) + + lock sync.Mutex + rateLimiter = rate.NewLimiter(rate.Every(time.Minute), 1) + collectableGVKs = make(map[schema.GroupVersionKind]struct{}) + memoryCachedDiscovery discovery.CachedDiscoveryInterface + diskCachedDiscovery discovery.CachedDiscoveryInterface ) type discoveryCacheType string @@ -187,6 +192,15 @@ func (t *garbageCollectorTrait) canBeDeleted(e *Environment, u unstructured.Unst } func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.GroupVersionKind]struct{}, error) { + lock.Lock() + defer lock.Unlock() + + // Rate limit to avoid Discovery and SelfSubjectRulesReview requests at every reconciliation. + if !rateLimiter.Allow() { + // Return the cached set of garbage collectable GVKs. + return collectableGVKs, nil + } + // We rely on the discovery API to retrieve all the resources GVK, // that results in an unbounded set that can impact garbage collection latency when scaling up. discoveryClient, err := t.discoveryClient() @@ -196,7 +210,7 @@ func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.Gr resources, err := discoveryClient.ServerPreferredNamespacedResources() // Swallow group discovery errors, e.g., Knative serving exposes // an aggregated API for custom.metrics.k8s.io that requires special - // authentication scheme while discovering preferred resources + // authentication scheme while discovering preferred resources. if err != nil && !discovery.IsGroupDiscoveryFailedError(err) { return nil, err } @@ -237,32 +251,30 @@ func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.Gr } } } + collectableGVKs = GVKs - return GVKs, nil + return collectableGVKs, nil } func (t *garbageCollectorTrait) discoveryClient() (discovery.DiscoveryInterface, error) { - discoveryClientLock.Lock() - defer discoveryClientLock.Unlock() - switch *t.DiscoveryCache { case diskDiscoveryCache: - if diskCachedDiscoveryClient != nil { - return diskCachedDiscoveryClient, nil + if diskCachedDiscovery != nil { + return diskCachedDiscovery, nil } config := t.Client.GetConfig() httpCacheDir := filepath.Join(mustHomeDir(), ".kube", "http-cache") diskCacheDir := filepath.Join(mustHomeDir(), ".kube", "cache", "discovery", toHostDir(config.Host)) var err error - diskCachedDiscoveryClient, err = disk.NewCachedDiscoveryClientForConfig(config, diskCacheDir, httpCacheDir, 10*time.Minute) - return diskCachedDiscoveryClient, err + diskCachedDiscovery, err = disk.NewCachedDiscoveryClientForConfig(config, diskCacheDir, httpCacheDir, 10*time.Minute) + return diskCachedDiscovery, err case memoryDiscoveryCache: - if memoryCachedDiscoveryClient != nil { - return memoryCachedDiscoveryClient, nil + if memoryCachedDiscovery != nil { + return memoryCachedDiscovery, nil } - memoryCachedDiscoveryClient = memory.NewMemCacheClient(t.Client.Discovery()) - return memoryCachedDiscoveryClient, nil + memoryCachedDiscovery = memory.NewMemCacheClient(t.Client.Discovery()) + return memoryCachedDiscovery, nil case disabledDiscoveryCache, "": return t.Client.Discovery(), nil From bb7d7350dfafe2464c77bb09b866bc166310945a Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Wed, 1 Jun 2022 19:07:20 +0200 Subject: [PATCH 3/3] chore(gc): Fix lint errors --- pkg/trait/gc.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/trait/gc.go b/pkg/trait/gc.go index 6ca3af7904..a088367fb5 100644 --- a/pkg/trait/gc.go +++ b/pkg/trait/gc.go @@ -27,9 +27,9 @@ import ( "sync" "time" + "github.com/pkg/errors" "golang.org/x/time/rate" - "github.com/apache/camel-k/pkg/util" authorization "k8s.io/api/authorization/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,6 +43,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime/pkg/client" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/util" ) var ( @@ -127,13 +128,13 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error { func (t *garbageCollectorTrait) garbageCollectResources(e *Environment) error { deletableGVKs, err := t.getDeletableTypes(e) if err != nil { - return fmt.Errorf("cannot discover GVK types: %v", err) + return errors.Wrap(err, "cannot discover GVK types") } integration, _ := labels.NewRequirement(v1.IntegrationLabel, selection.Equals, []string{e.Integration.Name}) generation, err := labels.NewRequirement("camel.apache.org/generation", selection.LessThan, []string{strconv.FormatInt(e.Integration.GetGeneration(), 10)}) if err != nil { - return fmt.Errorf("cannot determine generation requirement: %v", err) + return errors.Wrap(err, "cannot determine generation requirement") } selector := labels.NewSelector(). Add(*integration). @@ -142,8 +143,8 @@ func (t *garbageCollectorTrait) garbageCollectResources(e *Environment) error { return t.deleteEachOf(e.Ctx, deletableGVKs, e, selector) } -func (t *garbageCollectorTrait) deleteEachOf(ctx context.Context, GVKs map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) error { - for GVK := range GVKs { +func (t *garbageCollectorTrait) deleteEachOf(ctx context.Context, deletableGVKs map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) error { + for GVK := range deletableGVKs { resources := unstructured.UnstructuredList{ Object: map[string]interface{}{ "apiVersion": GVK.GroupVersion().String(), @@ -156,7 +157,7 @@ func (t *garbageCollectorTrait) deleteEachOf(ctx context.Context, GVKs map[schem } if err := t.Client.List(ctx, &resources, options...); err != nil { if !k8serrors.IsNotFound(err) { - return fmt.Errorf("cannot list child resources: %v", err) + return errors.Wrap(err, "cannot list child resources") } continue }