Skip to content

Commit

Permalink
helm-operator: reduce cache memory footprint
Browse files Browse the repository at this point in the history
Make use of label selectors used by informers for both the primary CR
and for chart manifest objects

Signed-off-by: Joe Lanford <[email protected]>
  • Loading branch information
joelanford committed Oct 4, 2023
1 parent d21ed64 commit e4b6554
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 106 deletions.
9 changes: 9 additions & 0 deletions changelog/fragments/helm-operator-cache-selectors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
entries:
- description: >
(helm): Use informer cache label selectors to reduce memory consumption.
kind: bugfix
breaking: false
- description: >
(helm): Fix bug with detection of owner reference support when setting up dynamic watches
kind: bugfix
breaking: false
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
k8s.io/client-go v0.26.2
k8s.io/kubectl v0.26.2
k8s.io/utils v0.0.0-20230711102312-30195339c3c7
sigs.k8s.io/controller-runtime v0.14.5
sigs.k8s.io/controller-runtime v0.14.6
sigs.k8s.io/controller-tools v0.11.3
sigs.k8s.io/kubebuilder/v3 v3.9.1
sigs.k8s.io/yaml v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1658,8 +1658,8 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35 h1:+xBL5uTc+BkPBwmMi3vYfUJjq+N3K+H6PXeETwf5cPI=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35/go.mod h1:WxjusMwXlKzfAs4p9km6XJRndVt2FROgMVCE4cdohFo=
sigs.k8s.io/controller-runtime v0.14.5 h1:6xaWFqzT5KuAQ9ufgUaj1G/+C4Y1GRkhrxl+BJ9i+5s=
sigs.k8s.io/controller-runtime v0.14.5/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
sigs.k8s.io/controller-runtime v0.14.6 h1:oxstGVvXGNnMvY7TAESYk+lzr6S3V5VFxQ6d92KcwQA=
sigs.k8s.io/controller-runtime v0.14.6/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
sigs.k8s.io/controller-tools v0.11.3 h1:T1xzLkog9saiyQSLz1XOImu4OcbdXWytc5cmYsBeBiE=
sigs.k8s.io/controller-tools v0.11.3/go.mod h1:qcfX7jfcfYD/b7lAhvqAyTbt/px4GpvN88WKLFFv7p8=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
Expand Down
113 changes: 86 additions & 27 deletions internal/cmd/helm-operator/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand All @@ -45,6 +46,10 @@ import (
"github.com/operator-framework/operator-sdk/internal/helm/watches"
"github.com/operator-framework/operator-sdk/internal/util/k8sutil"
sdkVersion "github.com/operator-framework/operator-sdk/internal/version"
"helm.sh/helm/v3/pkg/chart/loader"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
)

var log = logf.Log.WithName("cmd")
Expand Down Expand Up @@ -136,6 +141,23 @@ func run(cmd *cobra.Command, f *flags.Flags) {
// Set default manager options
options = f.ToManagerOptions(options)

if options.Scheme == nil {
options.Scheme = apimachruntime.NewScheme()
}

ws, err := watches.Load(f.WatchesFile)
if err != nil {
log.Error(err, "Failed to load watches file.")
os.Exit(1)
}

watchNamespaces := getWatchNamespaces(options.Namespace)
options.NewCache, err = buildNewCacheFunc(watchNamespaces, ws, options.Scheme)
if err != nil {
log.Error(err, "Failed to create NewCache function for manager.")
os.Exit(1)
}

if options.NewClient == nil {
options.NewClient = func(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
// Create the Client for Write operations.
Expand All @@ -152,27 +174,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
})
}
}
namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar)
log = log.WithValues("Namespace", namespace)
if found {
log.V(1).Info(fmt.Sprintf("Setting namespace with value in %s", k8sutil.WatchNamespaceEnvVar))
if namespace == metav1.NamespaceAll {
log.Info("Watching all namespaces.")
options.Namespace = metav1.NamespaceAll
} else {
if strings.Contains(namespace, ",") {
log.Info("Watching multiple namespaces.")
options.NewCache = cache.MultiNamespacedCacheBuilder(strings.Split(namespace, ","))
} else {
log.Info("Watching single namespace.")
options.Namespace = namespace
}
}
} else if options.Namespace == "" {
log.Info(fmt.Sprintf("Watch namespaces not configured by environment variable %s or file. "+
"Watching all namespaces.", k8sutil.WatchNamespaceEnvVar))
options.Namespace = metav1.NamespaceAll
}

mgr, err := manager.New(cfg, options)
if err != nil {
Expand All @@ -189,11 +190,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
os.Exit(1)
}

ws, err := watches.Load(f.WatchesFile)
if err != nil {
log.Error(err, "Failed to create new manager factories.")
os.Exit(1)
}
acg, err := helmClient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(), mgr.GetLogger())
if err != nil {
log.Error(err, "Failed to create Helm action config getter")
Expand All @@ -207,7 +203,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
}

err := controller.Add(mgr, controller.WatchOptions{
Namespace: namespace,
GVK: w.GroupVersionKind,
ManagerFactory: release.NewManagerFactory(mgr, acg, w.ChartDir),
ReconcilePeriod: reconcilePeriod,
Expand Down Expand Up @@ -250,3 +245,67 @@ func exitIfUnsupported(options manager.Options) {
os.Exit(1)
}
}

func getWatchNamespaces(defaultNamespace string) []string {
namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar)
log = log.WithValues("Namespace", namespace)
if found {
log.V(1).Info(fmt.Sprintf("Setting namespace with value in %s", k8sutil.WatchNamespaceEnvVar))
if namespace == metav1.NamespaceAll {
log.Info("Watching all namespaces.")
return []string{metav1.NamespaceAll}
}
if strings.Contains(namespace, ",") {
log.Info("Watching multiple namespaces.")
return strings.Split(namespace, ",")
}
log.Info("Watching single namespace.")
return []string{namespace}
}
if defaultNamespace == "" {
log.Info(fmt.Sprintf("Watch namespaces not configured by environment variable %s or file. "+
"Watching all namespaces.", k8sutil.WatchNamespaceEnvVar))
return []string{metav1.NamespaceAll}
}
return []string{defaultNamespace}
}

func buildNewCacheFunc(watchNamespaces []string, ws []watches.Watch, sch *apimachruntime.Scheme) (cache.NewCacheFunc, error) {
selectorsByObject := cache.SelectorsByObject{}
chartNames := make([]string, 0, len(ws))
for _, w := range ws {
sch.AddKnownTypeWithName(w.GroupVersionKind, &unstructured.Unstructured{})

crObj := &unstructured.Unstructured{}
crObj.SetGroupVersionKind(w.GroupVersionKind)
sel, err := metav1.LabelSelectorAsSelector(&w.Selector)
if err != nil {
return nil, fmt.Errorf("unable to parse watch selector for %s: %v", w.GroupVersionKind, err)
}
selectorsByObject[crObj] = cache.ObjectSelector{Label: sel}

chrt, err := loader.LoadDir(w.ChartDir)
if err != nil {
return nil, fmt.Errorf("unable to load chart for %s: %v", w.GroupVersionKind, err)
}
chartNames = append(chartNames, chrt.Name())

}
req, err := labels.NewRequirement("helm.sdk.operatorframework.io/chart", selection.In, chartNames)
if err != nil {
return nil, fmt.Errorf("unable to create label requirement for cache default selector: %v", err)
}
defaultSelector := labels.NewSelector().Add(*req)

return func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
opts.SelectorsByObject = selectorsByObject
opts.DefaultSelector = cache.ObjectSelector{Label: defaultSelector}
if len(watchNamespaces) > 1 {
return cache.MultiNamespacedCacheBuilder(watchNamespaces)(config, opts)
}
if len(watchNamespaces) == 1 {
opts.Namespace = watchNamespaces[0]
}
return cache.New(config, opts)
}, nil
}
42 changes: 42 additions & 0 deletions internal/helm/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,45 @@ func containsResourcePolicyKeep(annotations map[string]string) bool {
resourcePolicyType = strings.ToLower(strings.TrimSpace(resourcePolicyType))
return resourcePolicyType == kube.KeepPolicy
}

type labelInjectingClient struct {
kube.Interface
labels map[string]string
}

func NewLabelInjectingClient(base kube.Interface, labels map[string]string) kube.Interface {
return &labelInjectingClient{
Interface: base,
labels: labels,
}
}

func (c *labelInjectingClient) Build(reader io.Reader, validate bool) (kube.ResourceList, error) {
resourceList, err := c.Interface.Build(reader, validate)
if err != nil {
return resourceList, err
}
err = resourceList.Visit(func(r *resource.Info, err error) error {
if err != nil {
return err
}
objMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(r.Object)
if err != nil {
return err
}
u := &unstructured.Unstructured{Object: objMap}
labels := u.GetLabels()
if labels == nil {
labels = map[string]string{}
}
for k, v := range c.labels {
labels[k] = v
}
u.SetLabels(labels)
return nil
})
if err != nil {
return nil, err
}
return resourceList, nil
}
42 changes: 5 additions & 37 deletions internal/helm/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package controller

import (
"fmt"
"reflect"
"strings"
"sync"
"time"
Expand All @@ -31,7 +30,6 @@ import (
crthandler "sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
ctrlpredicate "sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/yaml"

Expand All @@ -46,7 +44,6 @@ var log = logf.Log.WithName("helm.controller")
// WatchOptions contains the necessary values to create a new controller that
// manages helm releases in a particular namespace based on a GVK watch.
type WatchOptions struct {
Namespace string
GVK schema.GroupVersionKind
ManagerFactory release.ManagerFactory
ReconcilePeriod time.Duration
Expand All @@ -71,10 +68,6 @@ func Add(mgr manager.Manager, options WatchOptions) error {
SuppressOverrideValues: options.SuppressOverrideValues,
}

// Register the GVK with the schema
mgr.GetScheme().AddKnownTypeWithName(options.GVK, &unstructured.Unstructured{})
metav1.AddToGroupVersion(mgr.GetScheme(), options.GVK.GroupVersion())

c, err := controller.New(controllerName, mgr, controller.Options{
Reconciler: r,
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
Expand All @@ -86,18 +79,7 @@ func Add(mgr manager.Manager, options WatchOptions) error {
o := &unstructured.Unstructured{}
o.SetGroupVersionKind(options.GVK)

var preds []ctrlpredicate.Predicate
p, err := parsePredicateSelector(options.Selector)

if err != nil {
return err
}

if p != nil {
preds = append(preds, p)
}

if err := c.Watch(&source.Kind{Type: o}, &libhandler.InstrumentedEnqueueRequestForObject{}, preds...); err != nil {
if err := c.Watch(&source.Kind{Type: o}, &libhandler.InstrumentedEnqueueRequestForObject{}); err != nil {
return err
}

Expand All @@ -106,33 +88,19 @@ func Add(mgr manager.Manager, options WatchOptions) error {
}

log.Info("Watching resource", "apiVersion", options.GVK.GroupVersion(), "kind",
options.GVK.Kind, "namespace", options.Namespace, "reconcilePeriod", options.ReconcilePeriod.String())
options.GVK.Kind, "reconcilePeriod", options.ReconcilePeriod.String())
return nil
}

// parsePredicateSelector parses the selector in the WatchOptions and creates a predicate
// that is used to filter resources based on the specified selector
func parsePredicateSelector(selector metav1.LabelSelector) (ctrlpredicate.Predicate, error) {
// If a selector has been specified in watches.yaml, add it to the watch's predicates.
if !reflect.ValueOf(selector).IsZero() {
p, err := ctrlpredicate.LabelSelectorPredicate(selector)
if err != nil {
return nil, fmt.Errorf("error constructing predicate from watches selector: %v", err)
}
return p, nil
}
return nil, nil
}

// watchDependentResources adds a release hook function to the HelmOperatorReconciler
// that adds watches for resources in released Helm charts.
func watchDependentResources(mgr manager.Manager, r *HelmOperatorReconciler, c controller.Controller) {
owner := &unstructured.Unstructured{}
owner.SetGroupVersionKind(r.GVK)

var m sync.RWMutex
watches := map[schema.GroupVersionKind]struct{}{}
releaseHook := func(release *rpb.Release) error {
owner := &unstructured.Unstructured{}
owner.SetGroupVersionKind(r.GVK)
owner.SetNamespace(release.Namespace)
resources := releaseutil.SplitManifests(release.Manifest)
for _, resource := range resources {
var u unstructured.Unstructured
Expand Down
39 changes: 0 additions & 39 deletions internal/helm/controller/controller_test.go

This file was deleted.

4 changes: 4 additions & 0 deletions internal/helm/release/manager_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (f managerFactory) NewManager(cr *unstructured.Unstructured, overrideValues
return nil, fmt.Errorf("failed to load chart dir: %w", err)
}

actionConfig.KubeClient = client.NewLabelInjectingClient(actionConfig.KubeClient, map[string]string{
"helm.sdk.operatorframework.io/chart": crChart.Name(),
})

releaseName, err := getReleaseName(actionConfig.Releases, crChart.Name(), cr)
if err != nil {
return nil, fmt.Errorf("failed to get helm release name: %w", err)
Expand Down

0 comments on commit e4b6554

Please sign in to comment.