Skip to content

Commit

Permalink
helm-operator: reduce cache memory footprint
Browse files Browse the repository at this point in the history
Make use of label selectors used by informers for both the primary CR
and for chart manifest objects

Signed-off-by: Joe Lanford <[email protected]>
  • Loading branch information
joelanford committed Oct 2, 2023
1 parent d21ed64 commit ce8c010
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 82 deletions.
9 changes: 9 additions & 0 deletions changelog/fragments/helm-operator-cache-selectors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
entries:
- description: >
(helm): Use informer cache label selectors to reduce memory consumption.
kind: bugfix
breaking: false
- description: >
(helm): Fix bug with detection of owner reference support when setting up dynamic watches
kind: bugfix
breaking: false
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
k8s.io/client-go v0.26.2
k8s.io/kubectl v0.26.2
k8s.io/utils v0.0.0-20230711102312-30195339c3c7
sigs.k8s.io/controller-runtime v0.14.5
sigs.k8s.io/controller-runtime v0.14.6
sigs.k8s.io/controller-tools v0.11.3
sigs.k8s.io/kubebuilder/v3 v3.9.1
sigs.k8s.io/yaml v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1658,8 +1658,8 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35 h1:+xBL5uTc+BkPBwmMi3vYfUJjq+N3K+H6PXeETwf5cPI=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35/go.mod h1:WxjusMwXlKzfAs4p9km6XJRndVt2FROgMVCE4cdohFo=
sigs.k8s.io/controller-runtime v0.14.5 h1:6xaWFqzT5KuAQ9ufgUaj1G/+C4Y1GRkhrxl+BJ9i+5s=
sigs.k8s.io/controller-runtime v0.14.5/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
sigs.k8s.io/controller-runtime v0.14.6 h1:oxstGVvXGNnMvY7TAESYk+lzr6S3V5VFxQ6d92KcwQA=
sigs.k8s.io/controller-runtime v0.14.6/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
sigs.k8s.io/controller-tools v0.11.3 h1:T1xzLkog9saiyQSLz1XOImu4OcbdXWytc5cmYsBeBiE=
sigs.k8s.io/controller-tools v0.11.3/go.mod h1:qcfX7jfcfYD/b7lAhvqAyTbt/px4GpvN88WKLFFv7p8=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
Expand Down
58 changes: 53 additions & 5 deletions internal/cmd/helm-operator/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand All @@ -45,6 +46,10 @@ import (
"github.com/operator-framework/operator-sdk/internal/helm/watches"
"github.com/operator-framework/operator-sdk/internal/util/k8sutil"
sdkVersion "github.com/operator-framework/operator-sdk/internal/version"
"helm.sh/helm/v3/pkg/chart/loader"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
)

var log = logf.Log.WithName("cmd")
Expand Down Expand Up @@ -136,6 +141,21 @@ func run(cmd *cobra.Command, f *flags.Flags) {
// Set default manager options
options = f.ToManagerOptions(options)

if options.Scheme == nil {
options.Scheme = apimachruntime.NewScheme()
}

ws, err := watches.Load(f.WatchesFile)
if err != nil {
log.Error(err, "Failed to create new manager factories.")
os.Exit(1)
}

options.NewCache, err = buildCacheFunc(ws, options.Scheme)
if err != nil {
log.Error(err, "Failed to build NewCache function for manager.")
os.Exit(1)
}
if options.NewClient == nil {
options.NewClient = func(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
// Create the Client for Write operations.
Expand Down Expand Up @@ -189,11 +209,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
os.Exit(1)
}

ws, err := watches.Load(f.WatchesFile)
if err != nil {
log.Error(err, "Failed to create new manager factories.")
os.Exit(1)
}
acg, err := helmClient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(), mgr.GetLogger())
if err != nil {
log.Error(err, "Failed to create Helm action config getter")
Expand Down Expand Up @@ -250,3 +265,36 @@ func exitIfUnsupported(options manager.Options) {
os.Exit(1)
}
}

func buildCacheFunc(ws []watches.Watch, sch *apimachruntime.Scheme) (cache.NewCacheFunc, error) {
selectorsByObject := cache.SelectorsByObject{}
chartNames := make([]string, 0, len(ws))
for _, w := range ws {
sch.AddKnownTypeWithName(w.GroupVersionKind, &unstructured.Unstructured{})

crObj := &unstructured.Unstructured{}
crObj.SetGroupVersionKind(w.GroupVersionKind)
sel, err := metav1.LabelSelectorAsSelector(&w.Selector)
if err != nil {
return nil, fmt.Errorf("unable to parse watch selector for %s: %v", w.GroupVersionKind, err)
}
selectorsByObject[crObj] = cache.ObjectSelector{Label: sel}

chrt, err := loader.LoadDir(w.ChartDir)
if err != nil {
return nil, fmt.Errorf("unable to load chart for %s: %v", w.GroupVersionKind, err)
}
chartNames = append(chartNames, chrt.Name())
}

req, err := labels.NewRequirement("helm.sdk.operatorframework.io/chart", selection.In, chartNames)
if err != nil {
return nil, fmt.Errorf("unable to create label requirement for cache default selector: %v", err)
}
defaultSelector := labels.NewSelector().Add(*req)

return cache.BuilderWithOptions(cache.Options{
SelectorsByObject: selectorsByObject,
DefaultSelector: cache.ObjectSelector{Label: defaultSelector},
}), nil
}
42 changes: 42 additions & 0 deletions internal/helm/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,45 @@ func containsResourcePolicyKeep(annotations map[string]string) bool {
resourcePolicyType = strings.ToLower(strings.TrimSpace(resourcePolicyType))
return resourcePolicyType == kube.KeepPolicy
}

type labelInjectingClient struct {
kube.Interface
labels map[string]string
}

func NewLabelInjectingClient(base kube.Interface, labels map[string]string) kube.Interface {
return &labelInjectingClient{
Interface: base,
labels: labels,
}
}

func (c *labelInjectingClient) Build(reader io.Reader, validate bool) (kube.ResourceList, error) {
resourceList, err := c.Interface.Build(reader, validate)
if err != nil {
return resourceList, err
}
err = resourceList.Visit(func(r *resource.Info, err error) error {
if err != nil {
return err
}
objMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(r.Object)
if err != nil {
return err
}
u := &unstructured.Unstructured{Object: objMap}
labels := u.GetLabels()
if labels == nil {
labels = map[string]string{}
}
for k, v := range c.labels {
labels[k] = v
}
u.SetLabels(labels)
return nil
})
if err != nil {
return nil, err
}
return resourceList, nil
}
39 changes: 4 additions & 35 deletions internal/helm/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package controller

import (
"fmt"
"reflect"
"strings"
"sync"
"time"
Expand All @@ -31,7 +30,6 @@ import (
crthandler "sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
ctrlpredicate "sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/yaml"

Expand Down Expand Up @@ -71,10 +69,6 @@ func Add(mgr manager.Manager, options WatchOptions) error {
SuppressOverrideValues: options.SuppressOverrideValues,
}

// Register the GVK with the schema
mgr.GetScheme().AddKnownTypeWithName(options.GVK, &unstructured.Unstructured{})
metav1.AddToGroupVersion(mgr.GetScheme(), options.GVK.GroupVersion())

c, err := controller.New(controllerName, mgr, controller.Options{
Reconciler: r,
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
Expand All @@ -86,18 +80,7 @@ func Add(mgr manager.Manager, options WatchOptions) error {
o := &unstructured.Unstructured{}
o.SetGroupVersionKind(options.GVK)

var preds []ctrlpredicate.Predicate
p, err := parsePredicateSelector(options.Selector)

if err != nil {
return err
}

if p != nil {
preds = append(preds, p)
}

if err := c.Watch(&source.Kind{Type: o}, &libhandler.InstrumentedEnqueueRequestForObject{}, preds...); err != nil {
if err := c.Watch(&source.Kind{Type: o}, &libhandler.InstrumentedEnqueueRequestForObject{}); err != nil {
return err
}

Expand All @@ -110,29 +93,15 @@ func Add(mgr manager.Manager, options WatchOptions) error {
return nil
}

// parsePredicateSelector parses the selector in the WatchOptions and creates a predicate
// that is used to filter resources based on the specified selector
func parsePredicateSelector(selector metav1.LabelSelector) (ctrlpredicate.Predicate, error) {
// If a selector has been specified in watches.yaml, add it to the watch's predicates.
if !reflect.ValueOf(selector).IsZero() {
p, err := ctrlpredicate.LabelSelectorPredicate(selector)
if err != nil {
return nil, fmt.Errorf("error constructing predicate from watches selector: %v", err)
}
return p, nil
}
return nil, nil
}

// watchDependentResources adds a release hook function to the HelmOperatorReconciler
// that adds watches for resources in released Helm charts.
func watchDependentResources(mgr manager.Manager, r *HelmOperatorReconciler, c controller.Controller) {
owner := &unstructured.Unstructured{}
owner.SetGroupVersionKind(r.GVK)

var m sync.RWMutex
watches := map[schema.GroupVersionKind]struct{}{}
releaseHook := func(release *rpb.Release) error {
owner := &unstructured.Unstructured{}
owner.SetGroupVersionKind(r.GVK)
owner.SetNamespace(release.Namespace)
resources := releaseutil.SplitManifests(release.Manifest)
for _, resource := range resources {
var u unstructured.Unstructured
Expand Down
39 changes: 0 additions & 39 deletions internal/helm/controller/controller_test.go

This file was deleted.

4 changes: 4 additions & 0 deletions internal/helm/release/manager_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (f managerFactory) NewManager(cr *unstructured.Unstructured, overrideValues
return nil, fmt.Errorf("failed to load chart dir: %w", err)
}

actionConfig.KubeClient = client.NewLabelInjectingClient(actionConfig.KubeClient, map[string]string{
"helm.sdk.operatorframework.io/chart": crChart.Name(),
})

releaseName, err := getReleaseName(actionConfig.Releases, crChart.Name(), cr)
if err != nil {
return nil, fmt.Errorf("failed to get helm release name: %w", err)
Expand Down

0 comments on commit ce8c010

Please sign in to comment.