Skip to content

Commit

Permalink
feat(gc): Use SelfSubjectRulesReview to scan for garbage collectable …
Browse files Browse the repository at this point in the history
…resources
  • Loading branch information
astefanutti committed Jun 2, 2022
1 parent bb44c5a commit 8caeee3
Showing 1 changed file with 60 additions and 40 deletions.
100 changes: 60 additions & 40 deletions pkg/trait/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package trait

import (
"context"
"fmt"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/apache/camel-k/pkg/util"
authorization "k8s.io/api/authorization/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -35,7 +38,6 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/disk"
"k8s.io/client-go/discovery/cached/memory"

ctrl "sigs.k8s.io/controller-runtime/pkg/client"

v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
Expand Down Expand Up @@ -81,8 +83,7 @@ func (t *garbageCollectorTrait) Configure(e *Environment) (bool, error) {
t.DiscoveryCache = &s
}

return e.IntegrationInPhase(v1.IntegrationPhaseInitialization) || e.IntegrationInRunningPhases(),
nil
return e.IntegrationInPhase(v1.IntegrationPhaseInitialization) || e.IntegrationInRunningPhases(), nil
}

func (t *garbageCollectorTrait) Apply(e *Environment) error {
Expand All @@ -92,12 +93,9 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error {
// Register a post action that deletes the existing resources that are labelled
// with the previous integration generations.
// TODO: this should be refined so that it's run when all the replicas for the newer generation
// are ready. This is to be added when the integration scale status is refined with ready replicas
// are ready.
e.PostActions = append(e.PostActions, func(env *Environment) error {
// The collection and deletion are performed asynchronously to avoid blocking
// the reconciliation loop.
go t.garbageCollectResources(env)
return nil
return t.garbageCollectResources(env)
})

fallthrough
Expand All @@ -121,43 +119,40 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error {
return nil
}

func (t *garbageCollectorTrait) garbageCollectResources(e *Environment) {
func (t *garbageCollectorTrait) garbageCollectResources(e *Environment) error {
deletableGVKs, err := t.getDeletableTypes(e)
if err != nil {
return fmt.Errorf("cannot discover GVK types: %v", err)
}

integration, _ := labels.NewRequirement(v1.IntegrationLabel, selection.Equals, []string{e.Integration.Name})
generation, err := labels.NewRequirement("camel.apache.org/generation", selection.LessThan, []string{strconv.FormatInt(e.Integration.GetGeneration(), 10)})
if err != nil {
t.L.ForIntegration(e.Integration).Errorf(err, "cannot determine generation requirement")
return
return fmt.Errorf("cannot determine generation requirement: %v", err)
}
selector := labels.NewSelector().
Add(*integration).
Add(*generation)

deletableGVKs, err := t.getDeletableTypes(e)
if err != nil {
t.L.ForIntegration(e.Integration).Errorf(err, "cannot discover GVK types")
return
}

t.deleteEachOf(deletableGVKs, e, selector)
return t.deleteEachOf(e.Ctx, deletableGVKs, e, selector)
}

func (t *garbageCollectorTrait) deleteEachOf(gvks map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) {
for gvk := range gvks {
func (t *garbageCollectorTrait) deleteEachOf(ctx context.Context, GVKs map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) error {
for GVK := range GVKs {
resources := unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": gvk.GroupVersion().String(),
"kind": gvk.Kind,
"apiVersion": GVK.GroupVersion().String(),
"kind": GVK.Kind,
},
}
options := []ctrl.ListOption{
ctrl.InNamespace(e.Integration.Namespace),
ctrl.MatchingLabelsSelector{Selector: selector},
}
if err := t.Client.List(context.TODO(), &resources, options...); err != nil {
if !k8serrors.IsNotFound(err) && !k8serrors.IsForbidden(err) {
t.L.ForIntegration(e.Integration).Errorf(err, "cannot list child resources: %v", gvk)
if err := t.Client.List(ctx, &resources, options...); err != nil {
if !k8serrors.IsNotFound(err) {
return fmt.Errorf("cannot list child resources: %v", err)
}

continue
}

Expand All @@ -166,7 +161,7 @@ func (t *garbageCollectorTrait) deleteEachOf(gvks map[schema.GroupVersionKind]st
if !t.canBeDeleted(e, r) {
continue
}
err := t.Client.Delete(context.TODO(), &r, ctrl.PropagationPolicy(metav1.DeletePropagationBackground))
err := t.Client.Delete(ctx, &r, ctrl.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil {
// The resource may have already been deleted
if !k8serrors.IsNotFound(err) {
Expand All @@ -177,6 +172,8 @@ func (t *garbageCollectorTrait) deleteEachOf(gvks map[schema.GroupVersionKind]st
}
}
}

return nil
}

func (t *garbageCollectorTrait) canBeDeleted(e *Environment, u unstructured.Unstructured) bool {
Expand All @@ -192,7 +189,7 @@ func (t *garbageCollectorTrait) canBeDeleted(e *Environment, u unstructured.Unst
func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.GroupVersionKind]struct{}, error) {
// We rely on the discovery API to retrieve all the resources GVK,
// that results in an unbounded set that can impact garbage collection latency when scaling up.
discoveryClient, err := t.discoveryClient(e)
discoveryClient, err := t.discoveryClient()
if err != nil {
return nil, err
}
Expand All @@ -206,21 +203,45 @@ func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.Gr

// We only take types that support the "delete" verb,
// to prevents from performing queries that we know are going to return "MethodNotAllowed".
return groupVersionKinds(discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)),
nil
}
APIResourceLists := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)

// Retrieve the permissions granted to the operator service account.
// We assume the operator has only to garbage collect the resources it has created.
srr := &authorization.SelfSubjectRulesReview{
Spec: authorization.SelfSubjectRulesReviewSpec{
Namespace: e.Integration.Namespace,
},
}
res, err := e.Client.AuthorizationV1().SelfSubjectRulesReviews().Create(e.Ctx, srr, metav1.CreateOptions{})
if err != nil {
return nil, err
}

func groupVersionKinds(rls []*metav1.APIResourceList) map[schema.GroupVersionKind]struct{} {
GVKs := map[schema.GroupVersionKind]struct{}{}
for _, rl := range rls {
for _, r := range rl.APIResources {
GVKs[schema.FromAPIVersionAndKind(rl.GroupVersion, r.Kind)] = struct{}{}
GVKs := make(map[schema.GroupVersionKind]struct{})
for _, APIResourceList := range APIResourceLists {
for _, resource := range APIResourceList.APIResources {
rule:
for _, rule := range res.Status.ResourceRules {
if !util.StringSliceContainsAnyOf(rule.Verbs, "delete", "*") {
continue
}
for _, group := range rule.APIGroups {
for _, name := range rule.Resources {
if (resource.Group == group || group == "*") && (resource.Name == name || name == "*") {
GVK := schema.FromAPIVersionAndKind(APIResourceList.GroupVersion, resource.Kind)
GVKs[GVK] = struct{}{}
break rule
}
}
}
}
}
}
return GVKs

return GVKs, nil
}

func (t *garbageCollectorTrait) discoveryClient(e *Environment) (discovery.DiscoveryInterface, error) {
func (t *garbageCollectorTrait) discoveryClient() (discovery.DiscoveryInterface, error) {
discoveryClientLock.Lock()
defer discoveryClientLock.Unlock()

Expand All @@ -247,7 +268,6 @@ func (t *garbageCollectorTrait) discoveryClient(e *Environment) (discovery.Disco
return t.Client.Discovery(), nil

default:
t.L.ForIntegration(e.Integration).Infof("unsupported discovery cache type: %s", *t.DiscoveryCache)
return t.Client.Discovery(), nil
return nil, fmt.Errorf("unsupported discovery cache type: %s", *t.DiscoveryCache)
}
}

0 comments on commit 8caeee3

Please sign in to comment.