Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gc): Use SelfSubjectRulesReview to scan for garbage collectable resources #3326

Merged
merged 3 commits into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ require (
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
gopkg.in/inf.v0 v0.9.1
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.22.5
Expand Down
145 changes: 89 additions & 56 deletions pkg/trait/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ package trait

import (
"context"
"fmt"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/pkg/errors"
"golang.org/x/time/rate"

authorization "k8s.io/api/authorization/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -35,17 +40,20 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/disk"
"k8s.io/client-go/discovery/cached/memory"

ctrl "sigs.k8s.io/controller-runtime/pkg/client"

v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/util"
)

var (
toFileName = regexp.MustCompile(`[^(\w/\.)]`)
diskCachedDiscoveryClient discovery.CachedDiscoveryInterface
memoryCachedDiscoveryClient discovery.CachedDiscoveryInterface
discoveryClientLock sync.Mutex
toFileName = regexp.MustCompile(`[^(\w/\.)]`)

lock sync.Mutex
rateLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
collectableGVKs = make(map[schema.GroupVersionKind]struct{})
memoryCachedDiscovery discovery.CachedDiscoveryInterface
diskCachedDiscovery discovery.CachedDiscoveryInterface
)

type discoveryCacheType string
Expand Down Expand Up @@ -81,8 +89,7 @@ func (t *garbageCollectorTrait) Configure(e *Environment) (bool, error) {
t.DiscoveryCache = &s
}

return e.IntegrationInPhase(v1.IntegrationPhaseInitialization) || e.IntegrationInRunningPhases(),
nil
return e.IntegrationInPhase(v1.IntegrationPhaseInitialization) || e.IntegrationInRunningPhases(), nil
}

func (t *garbageCollectorTrait) Apply(e *Environment) error {
Expand All @@ -92,12 +99,9 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error {
// Register a post action that deletes the existing resources that are labelled
// with the previous integration generations.
// TODO: this should be refined so that it's run when all the replicas for the newer generation
// are ready. This is to be added when the integration scale status is refined with ready replicas
// are ready.
e.PostActions = append(e.PostActions, func(env *Environment) error {
// The collection and deletion are performed asynchronously to avoid blocking
// the reconciliation loop.
go t.garbageCollectResources(env)
return nil
return t.garbageCollectResources(env)
})

fallthrough
Expand All @@ -121,43 +125,40 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error {
return nil
}

func (t *garbageCollectorTrait) garbageCollectResources(e *Environment) {
func (t *garbageCollectorTrait) garbageCollectResources(e *Environment) error {
deletableGVKs, err := t.getDeletableTypes(e)
if err != nil {
return errors.Wrap(err, "cannot discover GVK types")
}

integration, _ := labels.NewRequirement(v1.IntegrationLabel, selection.Equals, []string{e.Integration.Name})
generation, err := labels.NewRequirement("camel.apache.org/generation", selection.LessThan, []string{strconv.FormatInt(e.Integration.GetGeneration(), 10)})
if err != nil {
t.L.ForIntegration(e.Integration).Errorf(err, "cannot determine generation requirement")
return
return errors.Wrap(err, "cannot determine generation requirement")
}
selector := labels.NewSelector().
Add(*integration).
Add(*generation)

deletableGVKs, err := t.getDeletableTypes(e)
if err != nil {
t.L.ForIntegration(e.Integration).Errorf(err, "cannot discover GVK types")
return
}

t.deleteEachOf(deletableGVKs, e, selector)
return t.deleteEachOf(e.Ctx, deletableGVKs, e, selector)
}

func (t *garbageCollectorTrait) deleteEachOf(gvks map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) {
for gvk := range gvks {
func (t *garbageCollectorTrait) deleteEachOf(ctx context.Context, deletableGVKs map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) error {
for GVK := range deletableGVKs {
resources := unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": gvk.GroupVersion().String(),
"kind": gvk.Kind,
"apiVersion": GVK.GroupVersion().String(),
"kind": GVK.Kind,
},
}
options := []ctrl.ListOption{
ctrl.InNamespace(e.Integration.Namespace),
ctrl.MatchingLabelsSelector{Selector: selector},
}
if err := t.Client.List(context.TODO(), &resources, options...); err != nil {
if !k8serrors.IsNotFound(err) && !k8serrors.IsForbidden(err) {
t.L.ForIntegration(e.Integration).Errorf(err, "cannot list child resources: %v", gvk)
if err := t.Client.List(ctx, &resources, options...); err != nil {
if !k8serrors.IsNotFound(err) {
return errors.Wrap(err, "cannot list child resources")
}

continue
}

Expand All @@ -166,7 +167,7 @@ func (t *garbageCollectorTrait) deleteEachOf(gvks map[schema.GroupVersionKind]st
if !t.canBeDeleted(e, r) {
continue
}
err := t.Client.Delete(context.TODO(), &r, ctrl.PropagationPolicy(metav1.DeletePropagationBackground))
err := t.Client.Delete(ctx, &r, ctrl.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil {
// The resource may have already been deleted
if !k8serrors.IsNotFound(err) {
Expand All @@ -177,6 +178,8 @@ func (t *garbageCollectorTrait) deleteEachOf(gvks map[schema.GroupVersionKind]st
}
}
}

return nil
}

func (t *garbageCollectorTrait) canBeDeleted(e *Environment, u unstructured.Unstructured) bool {
Expand All @@ -190,64 +193,94 @@ func (t *garbageCollectorTrait) canBeDeleted(e *Environment, u unstructured.Unst
}

func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.GroupVersionKind]struct{}, error) {
lock.Lock()
defer lock.Unlock()

// Rate limit to avoid Discovery and SelfSubjectRulesReview requests at every reconciliation.
if !rateLimiter.Allow() {
// Return the cached set of garbage collectable GVKs.
return collectableGVKs, nil
}

// We rely on the discovery API to retrieve all the resources GVK,
// that results in an unbounded set that can impact garbage collection latency when scaling up.
discoveryClient, err := t.discoveryClient(e)
discoveryClient, err := t.discoveryClient()
if err != nil {
return nil, err
}
resources, err := discoveryClient.ServerPreferredNamespacedResources()
// Swallow group discovery errors, e.g., Knative serving exposes
// an aggregated API for custom.metrics.k8s.io that requires special
// authentication scheme while discovering preferred resources
// authentication scheme while discovering preferred resources.
if err != nil && !discovery.IsGroupDiscoveryFailedError(err) {
return nil, err
}

// We only take types that support the "delete" verb,
// to prevents from performing queries that we know are going to return "MethodNotAllowed".
return groupVersionKinds(discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)),
nil
}
APIResourceLists := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)

// Retrieve the permissions granted to the operator service account.
// We assume the operator has only to garbage collect the resources it has created.
srr := &authorization.SelfSubjectRulesReview{
Copy link
Member

@tadayosi tadayosi Jun 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for confirmation, don't we need to also check it with OpenShift version of SSRR if it's on OpenShift? Is it redundant?
https://docs.openshift.com/container-platform/4.10/rest_api/authorization_apis/selfsubjectrulesreview-authorization-openshift-io-v1.html

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the testing I've done, authorization.k8s.io works also in OpenShift. Generally it seems vanilla k8s APIs are supported in OpenShift and somehow translated into their OpenShift counterpart if any.

Spec: authorization.SelfSubjectRulesReviewSpec{
Namespace: e.Integration.Namespace,
},
}
res, err := e.Client.AuthorizationV1().SelfSubjectRulesReviews().Create(e.Ctx, srr, metav1.CreateOptions{})
if err != nil {
return nil, err
}

func groupVersionKinds(rls []*metav1.APIResourceList) map[schema.GroupVersionKind]struct{} {
GVKs := map[schema.GroupVersionKind]struct{}{}
for _, rl := range rls {
for _, r := range rl.APIResources {
GVKs[schema.FromAPIVersionAndKind(rl.GroupVersion, r.Kind)] = struct{}{}
GVKs := make(map[schema.GroupVersionKind]struct{})
for _, APIResourceList := range APIResourceLists {
for _, resource := range APIResourceList.APIResources {
rule:
for _, rule := range res.Status.ResourceRules {
if !util.StringSliceContainsAnyOf(rule.Verbs, "delete", "*") {
continue
}
for _, group := range rule.APIGroups {
for _, name := range rule.Resources {
if (resource.Group == group || group == "*") && (resource.Name == name || name == "*") {
GVK := schema.FromAPIVersionAndKind(APIResourceList.GroupVersion, resource.Kind)
GVKs[GVK] = struct{}{}
break rule
}
}
}
}
}
}
return GVKs
}
collectableGVKs = GVKs

func (t *garbageCollectorTrait) discoveryClient(e *Environment) (discovery.DiscoveryInterface, error) {
discoveryClientLock.Lock()
defer discoveryClientLock.Unlock()
return collectableGVKs, nil
}

func (t *garbageCollectorTrait) discoveryClient() (discovery.DiscoveryInterface, error) {
switch *t.DiscoveryCache {
case diskDiscoveryCache:
if diskCachedDiscoveryClient != nil {
return diskCachedDiscoveryClient, nil
if diskCachedDiscovery != nil {
return diskCachedDiscovery, nil
}
config := t.Client.GetConfig()
httpCacheDir := filepath.Join(mustHomeDir(), ".kube", "http-cache")
diskCacheDir := filepath.Join(mustHomeDir(), ".kube", "cache", "discovery", toHostDir(config.Host))
var err error
diskCachedDiscoveryClient, err = disk.NewCachedDiscoveryClientForConfig(config, diskCacheDir, httpCacheDir, 10*time.Minute)
return diskCachedDiscoveryClient, err
diskCachedDiscovery, err = disk.NewCachedDiscoveryClientForConfig(config, diskCacheDir, httpCacheDir, 10*time.Minute)
return diskCachedDiscovery, err

case memoryDiscoveryCache:
if memoryCachedDiscoveryClient != nil {
return memoryCachedDiscoveryClient, nil
if memoryCachedDiscovery != nil {
return memoryCachedDiscovery, nil
}
memoryCachedDiscoveryClient = memory.NewMemCacheClient(t.Client.Discovery())
return memoryCachedDiscoveryClient, nil
memoryCachedDiscovery = memory.NewMemCacheClient(t.Client.Discovery())
return memoryCachedDiscovery, nil

case disabledDiscoveryCache, "":
return t.Client.Discovery(), nil

default:
t.L.ForIntegration(e.Integration).Infof("unsupported discovery cache type: %s", *t.DiscoveryCache)
return t.Client.Discovery(), nil
return nil, fmt.Errorf("unsupported discovery cache type: %s", *t.DiscoveryCache)
}
}